2022-09-12 11:48:21 +00:00
package engine
import (
2023-03-13 11:37:35 +00:00
"context"
2022-09-12 11:48:21 +00:00
"errors"
"fmt"
2024-02-05 13:33:09 +00:00
"strings"
2024-02-06 10:59:50 +00:00
"sync"
2023-05-19 15:06:20 +00:00
"sync/atomic"
2022-09-12 11:48:21 +00:00
2023-04-12 14:35:10 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
2023-03-31 08:33:08 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
2024-02-06 10:59:50 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
2023-03-07 13:38:26 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
2023-04-14 06:38:29 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
2023-09-27 08:02:06 +00:00
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
2023-03-07 13:38:26 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
2023-05-31 09:24:04 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
2024-02-06 14:34:32 +00:00
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
2023-03-07 13:38:26 +00:00
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
2023-05-04 10:58:26 +00:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2022-09-12 11:48:21 +00:00
"go.uber.org/zap"
2024-09-18 09:15:32 +00:00
"golang.org/x/sync/errgroup"
)
const (
// containerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers.
containerWorkerCountDefault = 10
// objectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers.
objectWorkerCountDefault = 10
2022-09-12 11:48:21 +00:00
)
2023-05-31 10:24:30 +00:00
var (
ErrMustBeReadOnly = logicerr . New ( "shard must be in read-only mode" )
evacuationOperationLogField = zap . String ( "operation" , "evacuation" )
)
2023-04-14 06:38:29 +00:00
2024-02-05 13:33:09 +00:00
// EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto.
type EvacuateScope uint32
var (
EvacuateScopeObjects EvacuateScope = 1
EvacuateScopeTrees EvacuateScope = 2
)
func ( s EvacuateScope ) String ( ) string {
var sb strings . Builder
first := true
if s & EvacuateScopeObjects == EvacuateScopeObjects {
if ! first {
sb . WriteString ( ";" )
}
sb . WriteString ( "objects" )
first = false
}
if s & EvacuateScopeTrees == EvacuateScopeTrees {
if ! first {
sb . WriteString ( ";" )
}
sb . WriteString ( "trees" )
}
return sb . String ( )
}
2024-02-06 10:59:50 +00:00
func ( s EvacuateScope ) WithObjects ( ) bool {
return s & EvacuateScopeObjects == EvacuateScopeObjects
}
func ( s EvacuateScope ) WithTrees ( ) bool {
return s & EvacuateScopeTrees == EvacuateScopeTrees
}
func ( s EvacuateScope ) TreesOnly ( ) bool {
return s == EvacuateScopeTrees
}
2022-09-12 11:48:21 +00:00
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
2024-02-05 14:48:43 +00:00
ShardID [ ] * shard . ID
2024-07-03 06:55:04 +00:00
ObjectsHandler func ( context . Context , oid . Address , * objectSDK . Object ) ( bool , error )
2024-07-03 08:47:50 +00:00
TreeHandler func ( context . Context , cid . ID , string , pilorama . Forest ) ( bool , string , error )
2024-02-05 14:48:43 +00:00
IgnoreErrors bool
Async bool
Scope EvacuateScope
2024-09-18 09:15:32 +00:00
ContainerWorkerCount uint32
ObjectWorkerCount uint32
2022-09-12 11:48:21 +00:00
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
2024-02-05 14:48:43 +00:00
objEvacuated * atomic . Uint64
objTotal * atomic . Uint64
objFailed * atomic . Uint64
objSkipped * atomic . Uint64
trEvacuated * atomic . Uint64
trTotal * atomic . Uint64
trFailed * atomic . Uint64
2023-05-04 10:58:26 +00:00
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes ( ) * EvacuateShardRes {
return & EvacuateShardRes {
2024-02-05 14:48:43 +00:00
objEvacuated : new ( atomic . Uint64 ) ,
objTotal : new ( atomic . Uint64 ) ,
objFailed : new ( atomic . Uint64 ) ,
objSkipped : new ( atomic . Uint64 ) ,
trEvacuated : new ( atomic . Uint64 ) ,
trTotal : new ( atomic . Uint64 ) ,
trFailed : new ( atomic . Uint64 ) ,
2023-05-04 10:58:26 +00:00
}
2022-09-12 11:48:21 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsEvacuated returns amount of evacuated objects.
2022-09-19 10:31:55 +00:00
// Objects for which handler returned no error are also assumed evacuated.
2024-02-05 14:48:43 +00:00
func ( p * EvacuateShardRes ) ObjectsEvacuated ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objEvacuated . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsTotal returns total count objects to evacuate.
func ( p * EvacuateShardRes ) ObjectsTotal ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objTotal . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsFailed returns count of failed objects to evacuate.
func ( p * EvacuateShardRes ) ObjectsFailed ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objFailed . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsSkipped returns count of skipped objects.
func ( p * EvacuateShardRes ) ObjectsSkipped ( ) uint64 {
2023-11-02 15:22:58 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objSkipped . Load ( )
}
// TreesEvacuated returns amount of evacuated trees.
func ( p * EvacuateShardRes ) TreesEvacuated ( ) uint64 {
if p == nil {
return 0
}
return p . trEvacuated . Load ( )
}
// TreesTotal returns total count trees to evacuate.
func ( p * EvacuateShardRes ) TreesTotal ( ) uint64 {
if p == nil {
return 0
}
return p . trTotal . Load ( )
}
// TreesFailed returns count of failed trees to evacuate.
func ( p * EvacuateShardRes ) TreesFailed ( ) uint64 {
if p == nil {
return 0
}
return p . trFailed . Load ( )
2023-11-02 15:22:58 +00:00
}
2023-05-04 10:58:26 +00:00
// DeepCopy returns deep copy of result instance.
func ( p * EvacuateShardRes ) DeepCopy ( ) * EvacuateShardRes {
if p == nil {
return nil
}
2023-05-19 15:06:20 +00:00
res := & EvacuateShardRes {
2024-02-05 14:48:43 +00:00
objEvacuated : new ( atomic . Uint64 ) ,
objTotal : new ( atomic . Uint64 ) ,
objFailed : new ( atomic . Uint64 ) ,
objSkipped : new ( atomic . Uint64 ) ,
trEvacuated : new ( atomic . Uint64 ) ,
trTotal : new ( atomic . Uint64 ) ,
trFailed : new ( atomic . Uint64 ) ,
2023-05-04 10:58:26 +00:00
}
2023-05-19 15:06:20 +00:00
2024-02-05 14:48:43 +00:00
res . objEvacuated . Store ( p . objEvacuated . Load ( ) )
res . objTotal . Store ( p . objTotal . Load ( ) )
res . objFailed . Store ( p . objFailed . Load ( ) )
res . objSkipped . Store ( p . objSkipped . Load ( ) )
res . trTotal . Store ( p . trTotal . Load ( ) )
res . trEvacuated . Store ( p . trEvacuated . Load ( ) )
res . trFailed . Store ( p . trFailed . Load ( ) )
2023-05-19 15:06:20 +00:00
return res
2022-09-12 11:48:21 +00:00
}
type pooledShard struct {
hashedShard
pool util . WorkerPool
}
2022-10-10 17:54:14 +00:00
var errMustHaveTwoShards = errors . New ( "must have at least 1 spare shard" )
2022-09-12 11:48:21 +00:00
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) Evacuate ( ctx context . Context , prm EvacuateShardPrm ) ( * EvacuateShardRes , error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
default :
}
2024-02-05 12:42:30 +00:00
shardIDs := make ( [ ] string , len ( prm . ShardID ) )
for i := range prm . ShardID {
shardIDs [ i ] = prm . ShardID [ i ] . String ( )
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
2023-05-04 10:58:26 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.Evacuate" ,
trace . WithAttributes (
attribute . StringSlice ( "shardIDs" , shardIDs ) ,
2024-02-05 12:42:30 +00:00
attribute . Bool ( "async" , prm . Async ) ,
attribute . Bool ( "ignoreErrors" , prm . IgnoreErrors ) ,
2024-02-05 13:33:09 +00:00
attribute . Stringer ( "scope" , prm . Scope ) ,
2023-05-04 10:58:26 +00:00
) )
defer span . End ( )
2024-02-26 08:19:52 +00:00
shards , err := e . getActualShards ( shardIDs , prm )
2023-03-31 08:33:08 +00:00
if err != nil {
2023-05-04 10:58:26 +00:00
return nil , err
2023-03-31 08:33:08 +00:00
}
shardsToEvacuate := make ( map [ string ] * shard . Shard )
for i := range shardIDs {
for j := range shards {
if shards [ j ] . ID ( ) . String ( ) == shardIDs [ i ] {
shardsToEvacuate [ shardIDs [ i ] ] = shards [ j ] . Shard
}
}
}
2023-05-04 10:58:26 +00:00
res := NewEvacuateShardRes ( )
2024-02-05 12:42:30 +00:00
ctx = ctxOrBackground ( ctx , prm . Async )
2023-05-04 10:58:26 +00:00
eg , egCtx , err := e . evacuateLimiter . TryStart ( ctx , shardIDs , res )
if err != nil {
return nil , err
}
2024-09-18 09:15:32 +00:00
var mtx sync . RWMutex
copyShards := func ( ) [ ] pooledShard {
mtx . RLock ( )
defer mtx . RUnlock ( )
t := make ( [ ] pooledShard , len ( shards ) )
copy ( t , shards )
return t
}
2023-05-04 10:58:26 +00:00
eg . Go ( func ( ) error {
2024-09-18 09:15:32 +00:00
return e . evacuateShards ( egCtx , shardIDs , prm , res , copyShards , shardsToEvacuate )
2023-05-04 10:58:26 +00:00
} )
2024-02-05 12:42:30 +00:00
if prm . Async {
2023-05-04 10:58:26 +00:00
return nil , nil
}
return res , eg . Wait ( )
}
func ctxOrBackground ( ctx context . Context , background bool ) context . Context {
if background {
return context . Background ( )
}
return ctx
}
func ( e * StorageEngine ) evacuateShards ( ctx context . Context , shardIDs [ ] string , prm EvacuateShardPrm , res * EvacuateShardRes ,
2024-09-18 09:15:32 +00:00
shards func ( ) [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2023-10-31 11:56:55 +00:00
) error {
2023-05-04 10:58:26 +00:00
var err error
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateShards" ,
trace . WithAttributes (
attribute . StringSlice ( "shardIDs" , shardIDs ) ,
2024-02-05 12:42:30 +00:00
attribute . Bool ( "async" , prm . Async ) ,
attribute . Bool ( "ignoreErrors" , prm . IgnoreErrors ) ,
2024-02-05 14:48:43 +00:00
attribute . Stringer ( "scope" , prm . Scope ) ,
2023-05-04 10:58:26 +00:00
) )
defer func ( ) {
span . End ( )
e . evacuateLimiter . Complete ( err )
} ( )
2023-09-27 08:02:06 +00:00
e . log . Info ( logs . EngineStartedShardsEvacuation , zap . Strings ( "shard_ids" , shardIDs ) , evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
2023-03-31 08:33:08 +00:00
2024-02-06 10:59:50 +00:00
err = e . getTotals ( ctx , prm , shardsToEvacuate , res )
2023-05-04 10:58:26 +00:00
if err != nil {
2023-09-27 08:02:06 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToCount , zap . Strings ( "shard_ids" , shardIDs ) , zap . Error ( err ) , evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
2023-05-04 10:58:26 +00:00
return err
}
2023-03-31 08:33:08 +00:00
2024-09-18 09:15:32 +00:00
ctx , cancel , egShard , egContainer , egObject := e . createErrorGroupsForEvacuation ( ctx , prm )
continueLoop := true
for i := 0 ; continueLoop && i < len ( shardIDs ) ; i ++ {
select {
case <- ctx . Done ( ) :
continueLoop = false
default :
egShard . Go ( func ( ) error {
err := e . evacuateShard ( ctx , cancel , shardIDs [ i ] , prm , res , shards , shardsToEvacuate , egContainer , egObject )
if err != nil {
cancel ( err )
}
return err
} )
2023-03-31 08:33:08 +00:00
}
}
2024-09-18 09:15:32 +00:00
err = egShard . Wait ( )
if err != nil {
err = fmt . Errorf ( "shard error: %w" , err )
}
errContainer := egContainer . Wait ( )
errObject := egObject . Wait ( )
if errContainer != nil {
err = errors . Join ( err , fmt . Errorf ( "container error: %w" , errContainer ) )
}
if errObject != nil {
err = errors . Join ( err , fmt . Errorf ( "object error: %w" , errObject ) )
}
if err != nil {
e . log . Error ( logs . EngineFinishedWithErrorShardsEvacuation , zap . Error ( err ) , zap . Strings ( "shard_ids" , shardIDs ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
return err
}
2023-03-31 08:33:08 +00:00
2023-05-31 10:24:30 +00:00
e . log . Info ( logs . EngineFinishedSuccessfullyShardsEvacuation ,
zap . Strings ( "shard_ids" , shardIDs ) ,
evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . Uint64 ( "total_objects" , res . ObjectsTotal ( ) ) ,
zap . Uint64 ( "evacuated_objects" , res . ObjectsEvacuated ( ) ) ,
zap . Uint64 ( "failed_objects" , res . ObjectsFailed ( ) ) ,
zap . Uint64 ( "skipped_objects" , res . ObjectsSkipped ( ) ) ,
zap . Uint64 ( "total_trees" , res . TreesTotal ( ) ) ,
zap . Uint64 ( "evacuated_trees" , res . TreesEvacuated ( ) ) ,
zap . Uint64 ( "failed_trees" , res . TreesFailed ( ) ) ,
2023-05-31 10:24:30 +00:00
)
2023-05-04 10:58:26 +00:00
return nil
}
2024-09-18 09:15:32 +00:00
func ( e * StorageEngine ) createErrorGroupsForEvacuation ( ctx context . Context , prm EvacuateShardPrm ) (
context . Context , context . CancelCauseFunc , * errgroup . Group , * errgroup . Group , * errgroup . Group ,
) {
operationCtx , cancel := context . WithCancelCause ( ctx )
egObject , _ := errgroup . WithContext ( operationCtx )
objectWorkerCount := prm . ObjectWorkerCount
if objectWorkerCount == 0 {
objectWorkerCount = objectWorkerCountDefault
}
egObject . SetLimit ( int ( objectWorkerCount ) )
egContainer , _ := errgroup . WithContext ( operationCtx )
containerWorkerCount := prm . ContainerWorkerCount
if containerWorkerCount == 0 {
containerWorkerCount = containerWorkerCountDefault
}
egContainer . SetLimit ( int ( containerWorkerCount ) )
egShard , _ := errgroup . WithContext ( operationCtx )
return operationCtx , cancel , egShard , egContainer , egObject
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) getTotals ( ctx context . Context , prm EvacuateShardPrm , shardsToEvacuate map [ string ] * shard . Shard , res * EvacuateShardRes ) error {
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.getTotals" )
2023-05-04 10:58:26 +00:00
defer span . End ( )
for _ , sh := range shardsToEvacuate {
2024-02-06 10:59:50 +00:00
if prm . Scope . WithObjects ( ) {
cnt , err := sh . LogicalObjectsCount ( ctx )
if err != nil {
if errors . Is ( err , shard . ErrDegradedMode ) {
continue
}
return err
2023-05-04 10:58:26 +00:00
}
2024-02-06 10:59:50 +00:00
res . objTotal . Add ( cnt )
}
if prm . Scope . WithTrees ( ) && sh . PiloramaEnabled ( ) {
cnt , err := pilorama . TreeCountAll ( ctx , sh )
if err != nil {
return err
}
res . trTotal . Add ( cnt )
2023-05-04 10:58:26 +00:00
}
}
return nil
2023-03-31 08:33:08 +00:00
}
2024-09-18 09:15:32 +00:00
func ( e * StorageEngine ) evacuateShard ( ctx context . Context , cancel context . CancelCauseFunc , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
shards func ( ) [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
egContainer * errgroup . Group , egObject * errgroup . Group ,
2023-10-31 11:56:55 +00:00
) error {
2023-05-04 10:58:26 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateShard" ,
trace . WithAttributes (
attribute . String ( "shardID" , shardID ) ,
) )
defer span . End ( )
2024-02-06 10:59:50 +00:00
if prm . Scope . WithObjects ( ) {
2024-09-18 09:15:32 +00:00
if err := e . evacuateShardObjects ( ctx , cancel , shardID , prm , res , shards , shardsToEvacuate , egContainer , egObject ) ; err != nil {
2024-02-06 10:59:50 +00:00
return err
}
}
if prm . Scope . WithTrees ( ) && shardsToEvacuate [ shardID ] . PiloramaEnabled ( ) {
2024-02-26 08:19:52 +00:00
if err := e . evacuateShardTrees ( ctx , shardID , prm , res , shards , shardsToEvacuate ) ; err != nil {
2024-02-06 10:59:50 +00:00
return err
}
}
return nil
}
2024-09-18 09:15:32 +00:00
func ( e * StorageEngine ) evacuateShardObjects ( ctx context . Context , cancel context . CancelCauseFunc , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
shards func ( ) [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
egContainer * errgroup . Group , egObject * errgroup . Group ,
2024-02-06 10:59:50 +00:00
) error {
2023-03-31 08:33:08 +00:00
sh := shardsToEvacuate [ shardID ]
2024-09-18 09:15:32 +00:00
var cntPrm shard . IterateOverContainersPrm
cntPrm . Handler = func ( ctx context . Context , name [ ] byte , _ cid . ID ) error {
select {
case <- ctx . Done ( ) :
return context . Cause ( ctx )
default :
2023-03-31 08:33:08 +00:00
}
2024-09-18 09:15:32 +00:00
egContainer . Go ( func ( ) error {
var objPrm shard . IterateOverObjectsInContainerPrm
objPrm . BucketName = name
objPrm . Handler = func ( ctx context . Context , objInfo * object . Info ) error {
select {
case <- ctx . Done ( ) :
return context . Cause ( ctx )
default :
}
egObject . Go ( func ( ) error {
err := e . evacuateObject ( ctx , shardID , objInfo , prm , res , shards , shardsToEvacuate )
if err != nil {
cancel ( err )
}
return err
} )
return nil
}
err := sh . IterateOverObjectsInContainer ( ctx , objPrm )
if err != nil {
cancel ( err )
}
2023-03-31 08:33:08 +00:00
return err
2024-09-18 09:15:32 +00:00
} )
return nil
}
2023-03-31 08:33:08 +00:00
2024-09-18 09:15:32 +00:00
sh . SetEvacuationInProgress ( true )
err := sh . IterateOverContainers ( ctx , cntPrm )
if err != nil {
cancel ( err )
e . log . Error ( logs . EngineShardsEvacuationFailedToListObjects , zap . String ( "shard_id" , shardID ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-03-31 08:33:08 +00:00
}
2024-09-18 09:15:32 +00:00
return err
2023-03-31 08:33:08 +00:00
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) evacuateShardTrees ( ctx context . Context , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
2024-09-18 09:15:32 +00:00
getShards func ( ) [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2024-02-06 10:59:50 +00:00
) error {
sh := shardsToEvacuate [ shardID ]
2024-09-18 09:15:32 +00:00
shards := getShards ( )
2024-02-06 10:59:50 +00:00
var listPrm pilorama . TreeListTreesPrm
first := true
for len ( listPrm . NextPageToken ) > 0 || first {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
first = false
listRes , err := sh . TreeListTrees ( ctx , listPrm )
if err != nil {
return err
}
listPrm . NextPageToken = listRes . NextPageToken
2024-02-26 08:19:52 +00:00
if err := e . evacuateTrees ( ctx , sh , listRes . Items , prm , res , shards , shardsToEvacuate ) ; err != nil {
2024-02-06 10:59:50 +00:00
return err
}
}
return nil
}
func ( e * StorageEngine ) evacuateTrees ( ctx context . Context , sh * shard . Shard , trees [ ] pilorama . ContainerIDTreeID ,
2024-02-26 08:19:52 +00:00
prm EvacuateShardPrm , res * EvacuateShardRes , shards [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2024-02-06 10:59:50 +00:00
) error {
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateTrees" ,
trace . WithAttributes (
attribute . Int ( "trees_count" , len ( trees ) ) ,
) )
defer span . End ( )
for _ , contTree := range trees {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2024-02-26 08:19:52 +00:00
success , shardID , err := e . tryEvacuateTreeLocal ( ctx , sh , contTree , prm , shards , shardsToEvacuate )
2024-02-06 10:59:50 +00:00
if err != nil {
return err
}
if success {
2024-02-06 14:34:32 +00:00
e . log . Debug ( logs . EngineShardsEvacuationTreeEvacuatedLocal ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , zap . String ( "to_shard_id" , shardID ) ,
evacuationOperationLogField , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2024-02-06 10:59:50 +00:00
res . trEvacuated . Add ( 1 )
2024-02-06 14:34:32 +00:00
continue
}
2024-07-03 08:47:50 +00:00
moved , nodePK , err := e . evacuateTreeToOtherNode ( ctx , sh , contTree , prm )
2024-02-06 14:34:32 +00:00
if err != nil {
e . log . Error ( logs . EngineShardsEvacuationFailedToMoveTree ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , evacuationOperationLogField ,
zap . Error ( err ) , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
return err
2024-02-06 10:59:50 +00:00
}
2024-07-03 08:47:50 +00:00
if moved {
e . log . Debug ( logs . EngineShardsEvacuationTreeEvacuatedRemote ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "treeID" , contTree . TreeID ) ,
zap . String ( "from_shardID" , sh . ID ( ) . String ( ) ) , zap . String ( "to_node" , nodePK ) ,
evacuationOperationLogField , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
res . trEvacuated . Add ( 1 )
} else if prm . IgnoreErrors {
res . trFailed . Add ( 1 )
e . log . Warn ( logs . EngineShardsEvacuationFailedToMoveTree ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , evacuationOperationLogField ,
zap . Error ( err ) , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
} else {
e . log . Error ( logs . EngineShardsEvacuationFailedToMoveTree ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , evacuationOperationLogField ,
zap . Error ( err ) , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
return fmt . Errorf ( "no remote nodes available to replicate tree '%s' of container %s" , contTree . TreeID , contTree . CID )
}
2024-02-06 10:59:50 +00:00
}
return nil
}
2024-07-03 08:47:50 +00:00
func ( e * StorageEngine ) evacuateTreeToOtherNode ( ctx context . Context , sh * shard . Shard , tree pilorama . ContainerIDTreeID , prm EvacuateShardPrm ) ( bool , string , error ) {
2024-02-06 14:34:32 +00:00
if prm . TreeHandler == nil {
2024-07-03 08:47:50 +00:00
return false , "" , fmt . Errorf ( "failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available" , tree . TreeID , tree . CID , sh . ID ( ) )
2024-02-06 14:34:32 +00:00
}
return prm . TreeHandler ( ctx , tree . CID , tree . TreeID , sh )
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) tryEvacuateTreeLocal ( ctx context . Context , sh * shard . Shard , tree pilorama . ContainerIDTreeID ,
2024-02-26 08:19:52 +00:00
prm EvacuateShardPrm , shards [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2024-02-06 10:59:50 +00:00
) ( bool , string , error ) {
2024-02-26 08:19:52 +00:00
target , found , err := e . findShardToEvacuateTree ( ctx , tree , shards , shardsToEvacuate )
2024-02-06 10:59:50 +00:00
if err != nil {
return false , "" , err
}
if ! found {
return false , "" , nil
}
const readBatchSize = 1000
source := make ( chan * pilorama . Move , readBatchSize )
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
var wg sync . WaitGroup
wg . Add ( 1 )
var applyErr error
go func ( ) {
defer wg . Done ( )
applyErr = target . TreeApplyStream ( ctx , tree . CID , tree . TreeID , source )
if applyErr != nil {
cancel ( )
}
} ( )
var height uint64
for {
op , err := sh . TreeGetOpLog ( ctx , tree . CID , tree . TreeID , height )
if err != nil {
cancel ( )
wg . Wait ( )
close ( source ) // close after cancel to ctx.Done() hits first
if prm . IgnoreErrors {
return false , "" , nil
}
return false , "" , err
}
if op . Time == 0 { // completed get op log
close ( source )
wg . Wait ( )
if applyErr == nil {
return true , target . ID ( ) . String ( ) , nil
}
if prm . IgnoreErrors {
return false , "" , nil
}
return false , "" , applyErr
}
select {
case <- ctx . Done ( ) : // apply stream failed or operation cancelled
wg . Wait ( )
if prm . IgnoreErrors {
return false , "" , nil
}
if applyErr != nil {
return false , "" , applyErr
}
return false , "" , ctx . Err ( )
case source <- & op :
}
height = op . Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func ( e * StorageEngine ) findShardToEvacuateTree ( ctx context . Context , tree pilorama . ContainerIDTreeID ,
2024-02-26 08:19:52 +00:00
shards [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2024-02-06 10:59:50 +00:00
) ( pooledShard , bool , error ) {
2024-02-26 08:19:52 +00:00
hrw . SortHasherSliceByValue ( shards , hrw . StringHash ( tree . CID . EncodeToString ( ) ) )
2024-02-06 10:59:50 +00:00
var result pooledShard
var found bool
for _ , target := range shards {
select {
case <- ctx . Done ( ) :
return pooledShard { } , false , ctx . Err ( )
default :
}
if _ , ok := shardsToEvacuate [ target . ID ( ) . String ( ) ] ; ok {
continue
}
if ! target . PiloramaEnabled ( ) || target . GetMode ( ) . ReadOnly ( ) {
continue
}
if ! found {
result = target
found = true
}
exists , err := target . TreeExists ( ctx , tree . CID , tree . TreeID )
if err != nil {
continue
}
if exists {
return target , true , nil
}
}
return result , found , nil
}
2024-02-26 08:19:52 +00:00
func ( e * StorageEngine ) getActualShards ( shardIDs [ ] string , prm EvacuateShardPrm ) ( [ ] pooledShard , error ) {
2022-09-12 11:48:21 +00:00
e . mtx . RLock ( )
2023-03-31 08:33:08 +00:00
defer e . mtx . RUnlock ( )
for i := range shardIDs {
sh , ok := e . shards [ shardIDs [ i ] ]
2022-10-10 17:54:14 +00:00
if ! ok {
2024-02-26 08:19:52 +00:00
return nil , errShardNotFound
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
2022-10-10 17:54:14 +00:00
if ! sh . GetMode ( ) . ReadOnly ( ) {
2024-02-26 08:19:52 +00:00
return nil , ErrMustBeReadOnly
2022-10-10 17:54:14 +00:00
}
2024-02-06 10:59:50 +00:00
if prm . Scope . TreesOnly ( ) && ! sh . PiloramaEnabled ( ) {
2024-02-26 08:19:52 +00:00
return nil , fmt . Errorf ( "shard %s doesn't have pilorama enabled" , sh . ID ( ) )
2024-02-06 10:59:50 +00:00
}
2022-09-12 11:48:21 +00:00
}
2024-02-06 10:59:50 +00:00
if len ( e . shards ) - len ( shardIDs ) < 1 && prm . ObjectsHandler == nil && prm . Scope . WithObjects ( ) {
2024-02-26 08:19:52 +00:00
return nil , errMustHaveTwoShards
2022-09-12 11:48:21 +00:00
}
2024-02-06 14:34:32 +00:00
if len ( e . shards ) - len ( shardIDs ) < 1 && prm . TreeHandler == nil && prm . Scope . WithTrees ( ) {
2024-02-26 08:19:52 +00:00
return nil , errMustHaveTwoShards
2024-02-06 14:34:32 +00:00
}
2022-09-12 11:48:21 +00:00
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make ( [ ] pooledShard , 0 , len ( e . shards ) )
for id := range e . shards {
shards = append ( shards , pooledShard {
hashedShard : hashedShard ( e . shards [ id ] ) ,
pool : e . shardPools [ id ] ,
} )
}
2024-02-26 08:19:52 +00:00
return shards , nil
2023-03-31 08:33:08 +00:00
}
2024-09-18 09:15:32 +00:00
func ( e * StorageEngine ) evacuateObject ( ctx context . Context , shardID string , objInfo * object . Info , prm EvacuateShardPrm , res * EvacuateShardRes ,
getShards func ( ) [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard ,
2023-10-31 11:56:55 +00:00
) error {
2024-09-18 09:15:32 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateObjects" )
2023-05-04 10:58:26 +00:00
defer span . End ( )
2024-09-18 09:15:32 +00:00
select {
case <- ctx . Done ( ) :
return context . Cause ( ctx )
default :
}
2023-03-31 08:33:08 +00:00
2024-09-18 09:15:32 +00:00
shards := getShards ( )
addr := objInfo . Address
2023-03-31 08:33:08 +00:00
2024-09-18 09:15:32 +00:00
var getPrm shard . GetPrm
getPrm . SetAddress ( addr )
getPrm . SkipEvacCheck ( true )
2022-10-10 17:54:14 +00:00
2024-09-18 09:15:32 +00:00
getRes , err := shardsToEvacuate [ shardID ] . Get ( ctx , getPrm )
if err != nil {
if prm . IgnoreErrors {
res . objFailed . Add ( 1 )
return nil
2023-05-02 11:16:13 +00:00
}
2024-09-18 09:15:32 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToReadObject , zap . String ( "address" , addr . EncodeToString ( ) ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
return err
}
2023-05-02 11:16:13 +00:00
2024-09-18 09:15:32 +00:00
evacuatedLocal , err := e . tryEvacuateObjectLocal ( ctx , addr , getRes . Object ( ) , shardsToEvacuate [ shardID ] , shards , shardsToEvacuate , res )
if err != nil {
return err
}
2022-09-12 11:48:21 +00:00
2024-09-18 09:15:32 +00:00
if evacuatedLocal {
return nil
}
2022-09-12 11:48:21 +00:00
2024-09-18 09:15:32 +00:00
if prm . ObjectsHandler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt . Errorf ( "%w: %s" , errPutShard , objInfo )
}
moved , err := prm . ObjectsHandler ( ctx , addr , getRes . Object ( ) )
if err != nil {
e . log . Error ( logs . EngineShardsEvacuationFailedToMoveObject , zap . String ( "address" , addr . EncodeToString ( ) ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
return err
}
if moved {
res . objEvacuated . Add ( 1 )
} else if prm . IgnoreErrors {
res . objFailed . Add ( 1 )
e . log . Warn ( logs . EngineShardsEvacuationFailedToMoveObject , zap . String ( "address" , addr . EncodeToString ( ) ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
} else {
return fmt . Errorf ( "object %s was not replicated" , addr )
2023-03-31 08:33:08 +00:00
}
return nil
}
2022-09-12 11:48:21 +00:00
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) tryEvacuateObjectLocal ( ctx context . Context , addr oid . Address , object * objectSDK . Object , sh * shard . Shard ,
2024-02-26 08:19:52 +00:00
shards [ ] pooledShard , shardsToEvacuate map [ string ] * shard . Shard , res * EvacuateShardRes ,
2023-10-31 11:56:55 +00:00
) ( bool , error ) {
2024-02-26 08:19:52 +00:00
hrw . SortHasherSliceByValue ( shards , hrw . StringHash ( addr . EncodeToString ( ) ) )
2023-03-31 08:33:08 +00:00
for j := range shards {
2023-05-02 11:16:13 +00:00
select {
case <- ctx . Done ( ) :
return false , ctx . Err ( )
default :
}
2023-03-31 08:33:08 +00:00
if _ , ok := shardsToEvacuate [ shards [ j ] . ID ( ) . String ( ) ] ; ok {
continue
}
2024-02-20 12:55:18 +00:00
switch e . putToShard ( ctx , shards [ j ] . hashedShard , shards [ j ] . pool , addr , object ) . status {
2023-11-21 12:06:54 +00:00
case putToShardSuccess :
2024-02-05 14:48:43 +00:00
res . objEvacuated . Add ( 1 )
2023-11-21 12:06:54 +00:00
e . log . Debug ( logs . EngineObjectIsMovedToAnotherShard ,
zap . Stringer ( "from" , sh . ID ( ) ) ,
zap . Stringer ( "to" , shards [ j ] . ID ( ) ) ,
zap . Stringer ( "addr" , addr ) ,
evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-05-02 11:16:13 +00:00
return true , nil
2023-11-21 12:06:54 +00:00
case putToShardExists , putToShardRemoved :
2024-02-05 14:48:43 +00:00
res . objSkipped . Add ( 1 )
2023-11-21 12:06:54 +00:00
return true , nil
default :
continue
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
}
2022-10-10 17:54:14 +00:00
2023-05-02 11:16:13 +00:00
return false , nil
2022-09-12 11:48:21 +00:00
}
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) GetEvacuationState ( ctx context . Context ) ( * EvacuationState , error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
default :
}
return e . evacuateLimiter . GetState ( ) , nil
}
func ( e * StorageEngine ) EnqueRunningEvacuationStop ( ctx context . Context ) error {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
return e . evacuateLimiter . CancelIfRunning ( )
}
2024-03-12 15:57:38 +00:00
func ( e * StorageEngine ) ResetEvacuationStatus ( ctx context . Context ) error {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
return e . evacuateLimiter . ResetEvacuationStatus ( )
}
2024-09-03 09:18:10 +00:00
func ( e * StorageEngine ) ResetEvacuationStatusForShards ( ) {
e . mtx . RLock ( )
defer e . mtx . RUnlock ( )
for _ , sh := range e . shards {
sh . SetEvacuationInProgress ( false )
}
}