[#1356] engine: Evacuate object from shards concurrently
All checks were successful
DCO action / DCO (pull_request) Successful in 50s
Tests and linters / Run gofumpt (pull_request) Successful in 1m12s
Vulncheck / Vulncheck (pull_request) Successful in 2m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m25s
Build / Build Components (pull_request) Successful in 2m40s
Tests and linters / gopls check (pull_request) Successful in 2m49s
Tests and linters / Staticcheck (pull_request) Successful in 2m52s
Tests and linters / Lint (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Successful in 4m15s
Tests and linters / Tests (pull_request) Successful in 6m18s
All checks were successful
DCO action / DCO (pull_request) Successful in 50s
Tests and linters / Run gofumpt (pull_request) Successful in 1m12s
Vulncheck / Vulncheck (pull_request) Successful in 2m3s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m25s
Build / Build Components (pull_request) Successful in 2m40s
Tests and linters / gopls check (pull_request) Successful in 2m49s
Tests and linters / Staticcheck (pull_request) Successful in 2m52s
Tests and linters / Lint (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Successful in 4m15s
Tests and linters / Tests (pull_request) Successful in 6m18s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
d4493a6d08
commit
c8eae4ace6
8 changed files with 468 additions and 108 deletions
|
@ -21,6 +21,9 @@ const (
|
||||||
noProgressFlag = "no-progress"
|
noProgressFlag = "no-progress"
|
||||||
scopeFlag = "scope"
|
scopeFlag = "scope"
|
||||||
|
|
||||||
|
containerWorkerCountFlag = "container-worker-count"
|
||||||
|
objectWorkerCountFlag = "object-worker-count"
|
||||||
|
|
||||||
scopeAll = "all"
|
scopeAll = "all"
|
||||||
scopeObjects = "objects"
|
scopeObjects = "objects"
|
||||||
scopeTrees = "trees"
|
scopeTrees = "trees"
|
||||||
|
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.Get(cmd)
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
||||||
|
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
||||||
|
|
||||||
req := &control.StartShardEvacuationRequest{
|
req := &control.StartShardEvacuationRequest{
|
||||||
Body: &control.StartShardEvacuationRequest_Body{
|
Body: &control.StartShardEvacuationRequest_Body{
|
||||||
Shard_ID: getShardIDList(cmd),
|
Shard_ID: getShardIDList(cmd),
|
||||||
IgnoreErrors: ignoreErrors,
|
IgnoreErrors: ignoreErrors,
|
||||||
Scope: getEvacuationScope(cmd),
|
Scope: getEvacuationScope(cmd),
|
||||||
|
ContainerWorkerCount: containerWorkerCount,
|
||||||
|
ObjectWorkerCount: objectWorkerCount,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
|
||||||
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
||||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||||
|
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
||||||
|
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
||||||
|
|
||||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
@ -24,6 +23,16 @@ import (
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// containerWorkerCountDefault is a default value of the count of
|
||||||
|
// concurrent container evacuation workers.
|
||||||
|
containerWorkerCountDefault = 10
|
||||||
|
// objectWorkerCountDefault is a default value of the count of
|
||||||
|
// concurrent object evacuation workers.
|
||||||
|
objectWorkerCountDefault = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -79,6 +88,9 @@ type EvacuateShardPrm struct {
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
Async bool
|
Async bool
|
||||||
Scope EvacuateScope
|
Scope EvacuateScope
|
||||||
|
|
||||||
|
ContainerWorkerCount uint32
|
||||||
|
ObjectWorkerCount uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||||
|
@ -189,8 +201,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultEvacuateBatchSize = 100
|
|
||||||
|
|
||||||
type pooledShard struct {
|
type pooledShard struct {
|
||||||
hashedShard
|
hashedShard
|
||||||
pool util.WorkerPool
|
pool util.WorkerPool
|
||||||
|
@ -242,8 +252,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var mtx sync.RWMutex
|
||||||
|
copyShards := func() []pooledShard {
|
||||||
|
mtx.RLock()
|
||||||
|
defer mtx.RUnlock()
|
||||||
|
t := make([]pooledShard, len(shards))
|
||||||
|
copy(t, shards)
|
||||||
|
return t
|
||||||
|
}
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
|
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
|
||||||
})
|
})
|
||||||
|
|
||||||
if prm.Async {
|
if prm.Async {
|
||||||
|
@ -261,7 +279,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
var err error
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||||
|
@ -287,13 +305,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, shardID := range shardIDs {
|
ctx, cancel, egShard, egContainer, egObject := e.createErrorGroupsForEvacuation(ctx, prm)
|
||||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
continueLoop := true
|
||||||
|
for i := 0; continueLoop && i < len(shardIDs); i++ {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
continueLoop = false
|
||||||
|
default:
|
||||||
|
egShard.Go(func() error {
|
||||||
|
err := e.evacuateShard(ctx, cancel, shardIDs[i], prm, res, shards, shardsToEvacuate, egContainer, egObject)
|
||||||
|
if err != nil {
|
||||||
|
cancel(err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = egShard.Wait()
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("shard error: %w", err)
|
||||||
|
}
|
||||||
|
errContainer := egContainer.Wait()
|
||||||
|
errObject := egObject.Wait()
|
||||||
|
if errContainer != nil {
|
||||||
|
err = errors.Join(err, fmt.Errorf("container error: %w", errContainer))
|
||||||
|
}
|
||||||
|
if errObject != nil {
|
||||||
|
err = errors.Join(err, fmt.Errorf("object error: %w", errObject))
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
||||||
zap.Strings("shard_ids", shardIDs),
|
zap.Strings("shard_ids", shardIDs),
|
||||||
|
@ -309,6 +353,27 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *StorageEngine) createErrorGroupsForEvacuation(ctx context.Context, prm EvacuateShardPrm) (
|
||||||
|
context.Context, context.CancelCauseFunc, *errgroup.Group, *errgroup.Group, *errgroup.Group,
|
||||||
|
) {
|
||||||
|
operationCtx, cancel := context.WithCancelCause(ctx)
|
||||||
|
egObject, _ := errgroup.WithContext(operationCtx)
|
||||||
|
objectWorkerCount := prm.ObjectWorkerCount
|
||||||
|
if objectWorkerCount == 0 {
|
||||||
|
objectWorkerCount = objectWorkerCountDefault
|
||||||
|
}
|
||||||
|
egObject.SetLimit(int(objectWorkerCount))
|
||||||
|
egContainer, _ := errgroup.WithContext(operationCtx)
|
||||||
|
containerWorkerCount := prm.ContainerWorkerCount
|
||||||
|
if containerWorkerCount == 0 {
|
||||||
|
containerWorkerCount = containerWorkerCountDefault
|
||||||
|
}
|
||||||
|
egContainer.SetLimit(int(containerWorkerCount))
|
||||||
|
egShard, _ := errgroup.WithContext(operationCtx)
|
||||||
|
|
||||||
|
return operationCtx, cancel, egShard, egContainer, egObject
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
|
func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
@ -335,8 +400,9 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShard(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
|
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
|
@ -345,11 +411,10 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if prm.Scope.WithObjects() {
|
if prm.Scope.WithObjects() {
|
||||||
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
if err := e.evacuateShardObjects(ctx, cancel, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
||||||
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -359,44 +424,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, cancel context.CancelCauseFunc, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
|
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||||
) error {
|
) error {
|
||||||
var listPrm shard.ListWithCursorPrm
|
|
||||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
|
||||||
|
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
sh.SetEvacuationInProgress(true)
|
var cntPrm shard.IterateOverContainersPrm
|
||||||
|
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
|
||||||
var c *meta.Cursor
|
select {
|
||||||
for {
|
case <-ctx.Done():
|
||||||
listPrm.WithCursor(c)
|
return context.Cause(ctx)
|
||||||
|
default:
|
||||||
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
|
}
|
||||||
// because ListWithCursor works only with the metabase.
|
egContainer.Go(func() error {
|
||||||
listRes, err := sh.ListWithCursor(ctx, listPrm)
|
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||||
|
objPrm.BucketName = name
|
||||||
|
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return context.Cause(ctx)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
egObject.Go(func() error {
|
||||||
|
err := e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
cancel(err)
|
||||||
break
|
|
||||||
}
|
}
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return err
|
return err
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := sh.IterateOverObjectsInContainer(ctx, objPrm)
|
||||||
|
if err != nil {
|
||||||
|
cancel(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
})
|
||||||
|
|
||||||
c = listRes.Cursor()
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sh.SetEvacuationInProgress(true)
|
||||||
|
err := sh.IterateOverContainers(ctx, cntPrm)
|
||||||
|
if err != nil {
|
||||||
|
cancel(err)
|
||||||
|
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
|
shards := getShards()
|
||||||
|
|
||||||
var listPrm pilorama.TreeListTreesPrm
|
var listPrm pilorama.TreeListTreesPrm
|
||||||
first := true
|
first := true
|
||||||
|
@ -637,51 +718,49 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
||||||
return shards, nil
|
return shards, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
||||||
trace.WithAttributes(
|
|
||||||
attribute.Int("objects_count", len(toEvacuate)),
|
|
||||||
))
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
for i := range toEvacuate {
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return context.Cause(ctx)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
addr := toEvacuate[i].Address
|
|
||||||
|
shards := getShards()
|
||||||
|
addr := objInfo.Address
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm shard.GetPrm
|
||||||
getPrm.SetAddress(addr)
|
getPrm.SetAddress(addr)
|
||||||
getPrm.SkipEvacCheck(true)
|
getPrm.SkipEvacCheck(true)
|
||||||
|
|
||||||
getRes, err := sh.Get(ctx, getPrm)
|
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if prm.IgnoreErrors {
|
if prm.IgnoreErrors {
|
||||||
res.objFailed.Add(1)
|
res.objFailed.Add(1)
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
|
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if evacuatedLocal {
|
if evacuatedLocal {
|
||||||
continue
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.ObjectsHandler == nil {
|
if prm.ObjectsHandler == nil {
|
||||||
// Do not check ignoreErrors flag here because
|
// Do not check ignoreErrors flag here because
|
||||||
// ignoring errors on put make this command kinda useless.
|
// ignoring errors on put make this command kinda useless.
|
||||||
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
return fmt.Errorf("%w: %s", errPutShard, objInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||||
|
@ -699,7 +778,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("object %s was not replicated", addr)
|
return fmt.Errorf("object %s was not replicated", addr)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -174,13 +176,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
errReplication := errors.New("handler error")
|
errReplication := errors.New("handler error")
|
||||||
|
|
||||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||||
var n uint64
|
var n atomic.Uint64
|
||||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||||
if n == max {
|
if n.Load() == max {
|
||||||
return false, errReplication
|
return false, errReplication
|
||||||
}
|
}
|
||||||
|
|
||||||
n++
|
n.Add(1)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
if addr == objectCore.AddressOf(objects[i]) {
|
if addr == objectCore.AddressOf(objects[i]) {
|
||||||
require.Equal(t, objects[i], obj)
|
require.Equal(t, objects[i], obj)
|
||||||
|
@ -314,6 +316,36 @@ func TestEvacuateCancellation(t *testing.T) {
|
||||||
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
require.Equal(t, uint64(0), res.ObjectsEvacuated())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEvacuateCancellationByError(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
e, ids, _ := newEngineEvacuate(t, 2, 10)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, e.Close(context.Background()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||||
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
var prm EvacuateShardPrm
|
||||||
|
prm.ShardID = ids[1:2]
|
||||||
|
var once atomic.Bool
|
||||||
|
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) (bool, error) {
|
||||||
|
var err error
|
||||||
|
flag := true
|
||||||
|
if once.CompareAndSwap(false, true) {
|
||||||
|
err = errors.New("test error")
|
||||||
|
flag = false
|
||||||
|
}
|
||||||
|
return flag, err
|
||||||
|
}
|
||||||
|
prm.Scope = EvacuateScopeObjects
|
||||||
|
prm.ObjectWorkerCount = 2
|
||||||
|
prm.ContainerWorkerCount = 2
|
||||||
|
|
||||||
|
_, err := e.Evacuate(context.Background(), prm)
|
||||||
|
require.ErrorContains(t, err, "test error")
|
||||||
|
}
|
||||||
|
|
||||||
func TestEvacuateSingleProcess(t *testing.T) {
|
func TestEvacuateSingleProcess(t *testing.T) {
|
||||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -531,6 +563,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||||
|
|
||||||
|
mutex := sync.Mutex{}
|
||||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||||
var prm EvacuateShardPrm
|
var prm EvacuateShardPrm
|
||||||
prm.ShardID = ids
|
prm.ShardID = ids
|
||||||
|
@ -545,7 +578,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
||||||
if op.Time == 0 {
|
if op.Time == 0 {
|
||||||
return true, "", nil
|
return true, "", nil
|
||||||
}
|
}
|
||||||
|
mutex.Lock()
|
||||||
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
||||||
|
mutex.Unlock()
|
||||||
height = op.Time + 1
|
height = op.Time + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -61,6 +62,20 @@ func (l ListRes) Cursor() *Cursor {
|
||||||
return l.cursor
|
return l.cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||||
|
type IterateOverContainersPrm struct {
|
||||||
|
// Handler function executed upon containers in db.
|
||||||
|
Handler func(context.Context, []byte, cid.ID) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
|
type IterateOverObjectsInContainerPrm struct {
|
||||||
|
// BucketName container's bucket name.
|
||||||
|
BucketName []byte
|
||||||
|
// Handler function executed upon objects in db.
|
||||||
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
|
@ -259,3 +274,155 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
|
||||||
|
|
||||||
return rawID, name[0]
|
return rawID, name[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateOverContainers lists physical containers available in metabase starting from first.
|
||||||
|
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
return db.iterateOverContainers(ctx, tx, prm)
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
|
||||||
|
var containerID cid.ID
|
||||||
|
for _, prefix := range [][]byte{{byte(primaryPrefix)}, {byte(lockersPrefix)}, {byte(tombstonePrefix)}} {
|
||||||
|
c := tx.Cursor()
|
||||||
|
for name, _ := c.Seek(prefix); name != nil && bytes.HasPrefix(name, prefix); name, _ = c.Next() {
|
||||||
|
cidRaw, _ := parseContainerIDWithPrefix(&containerID, name)
|
||||||
|
if cidRaw == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
bktName := make([]byte, len(name))
|
||||||
|
copy(bktName, name)
|
||||||
|
var cnt cid.ID
|
||||||
|
copy(cnt[:], containerID[:])
|
||||||
|
err := prm.Handler(ctx, bktName, cnt)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
|
||||||
|
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var containerID cid.ID
|
||||||
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
|
||||||
|
if cidRaw == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
return db.iterateOverObjectsInContainer(ctx, tx, cidRaw, prefix, containerID, prm)
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, cidRaw []byte, prefix byte,
|
||||||
|
containerID cid.ID, prm IterateOverObjectsInContainerPrm,
|
||||||
|
) error {
|
||||||
|
bkt := tx.Bucket(prm.BucketName)
|
||||||
|
if bkt == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||||
|
garbageBkt := tx.Bucket(garbageBucketName)
|
||||||
|
c := bkt.Cursor()
|
||||||
|
k, v := c.First()
|
||||||
|
|
||||||
|
var objType objectSDK.Type
|
||||||
|
|
||||||
|
switch prefix {
|
||||||
|
case primaryPrefix:
|
||||||
|
objType = objectSDK.TypeRegular
|
||||||
|
case lockersPrefix:
|
||||||
|
objType = objectSDK.TypeLock
|
||||||
|
case tombstonePrefix:
|
||||||
|
objType = objectSDK.TypeTombstone
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for ; k != nil; k, v = c.Next() {
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(k); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var isLinkingObj bool
|
||||||
|
var ecInfo *objectcore.ECInfo
|
||||||
|
if objType == objectSDK.TypeRegular {
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
ecHeader := o.ECHeader()
|
||||||
|
if ecHeader != nil {
|
||||||
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
ParentID: ecHeader.Parent(),
|
||||||
|
Index: ecHeader.Index(),
|
||||||
|
Total: ecHeader.Total(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var a oid.Address
|
||||||
|
a.SetContainer(containerID)
|
||||||
|
a.SetObject(obj)
|
||||||
|
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||||
|
err := prm.Handler(ctx, &objInfo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
|
||||||
return r.containers
|
return r.containers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||||
|
type IterateOverContainersPrm struct {
|
||||||
|
// Handler function executed upon containers in db.
|
||||||
|
Handler func(context.Context, []byte, cid.ID) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||||
|
type IterateOverObjectsInContainerPrm struct {
|
||||||
|
// BucketName container's bucket name.
|
||||||
|
BucketName []byte
|
||||||
|
// Handler function executed upon containers in db.
|
||||||
|
Handler func(context.Context, *objectcore.Info) error
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
type ListWithCursorPrm struct {
|
type ListWithCursorPrm struct {
|
||||||
count uint32
|
count uint32
|
||||||
|
@ -164,3 +178,54 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
||||||
cursor: res.Cursor(),
|
cursor: res.Cursor(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateOverContainers lists physical containers presented in shard.
|
||||||
|
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaPrm meta.IterateOverContainersPrm
|
||||||
|
metaPrm.Handler = prm.Handler
|
||||||
|
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not iterate over containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
|
||||||
|
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
s.m.RLock()
|
||||||
|
defer s.m.RUnlock()
|
||||||
|
|
||||||
|
if s.info.Mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||||
|
metaPrm.BucketName = prm.BucketName
|
||||||
|
metaPrm.Handler = prm.Handler
|
||||||
|
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not iterate over objects: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,8 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
||||||
TreeHandler: s.replicateTree,
|
TreeHandler: s.replicateTree,
|
||||||
Async: true,
|
Async: true,
|
||||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||||
|
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
|
||||||
|
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.s.Evacuate(ctx, prm)
|
_, err = s.s.Evacuate(ctx, prm)
|
||||||
|
|
|
@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
|
||||||
bool ignore_errors = 2;
|
bool ignore_errors = 2;
|
||||||
// Evacuation scope.
|
// Evacuation scope.
|
||||||
uint32 scope = 3;
|
uint32 scope = 3;
|
||||||
|
// Count of concurrent container evacuation workers.
|
||||||
|
uint32 container_worker_count = 4;
|
||||||
|
// Count of concurrent object evacuation workers.
|
||||||
|
uint32 object_worker_count = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
Body body = 1;
|
Body body = 1;
|
||||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue