2022-09-12 11:48:21 +00:00
package engine
import (
2023-03-13 11:37:35 +00:00
"context"
2022-09-12 11:48:21 +00:00
"errors"
"fmt"
2024-02-05 13:33:09 +00:00
"strings"
2024-02-06 10:59:50 +00:00
"sync"
2023-05-19 15:06:20 +00:00
"sync/atomic"
2022-09-12 11:48:21 +00:00
2023-04-12 14:35:10 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
2023-03-31 08:33:08 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
2023-03-07 13:38:26 +00:00
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
2024-02-06 10:59:50 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
2023-03-07 13:38:26 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
2023-04-14 06:38:29 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
2023-09-27 08:02:06 +00:00
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
2023-03-07 13:38:26 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
2023-05-31 09:24:04 +00:00
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
2024-02-06 14:34:32 +00:00
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
2023-03-07 13:38:26 +00:00
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
2023-05-04 10:58:26 +00:00
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
2022-09-12 11:48:21 +00:00
"go.uber.org/zap"
)
2023-05-31 10:24:30 +00:00
var (
ErrMustBeReadOnly = logicerr . New ( "shard must be in read-only mode" )
evacuationOperationLogField = zap . String ( "operation" , "evacuation" )
)
2023-04-14 06:38:29 +00:00
2024-02-05 13:33:09 +00:00
// EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto.
type EvacuateScope uint32
var (
EvacuateScopeObjects EvacuateScope = 1
EvacuateScopeTrees EvacuateScope = 2
)
func ( s EvacuateScope ) String ( ) string {
var sb strings . Builder
first := true
if s & EvacuateScopeObjects == EvacuateScopeObjects {
if ! first {
sb . WriteString ( ";" )
}
sb . WriteString ( "objects" )
first = false
}
if s & EvacuateScopeTrees == EvacuateScopeTrees {
if ! first {
sb . WriteString ( ";" )
}
sb . WriteString ( "trees" )
}
return sb . String ( )
}
2024-02-06 10:59:50 +00:00
func ( s EvacuateScope ) WithObjects ( ) bool {
return s & EvacuateScopeObjects == EvacuateScopeObjects
}
func ( s EvacuateScope ) WithTrees ( ) bool {
return s & EvacuateScopeTrees == EvacuateScopeTrees
}
func ( s EvacuateScope ) TreesOnly ( ) bool {
return s == EvacuateScopeTrees
}
2022-09-12 11:48:21 +00:00
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
2024-02-05 14:48:43 +00:00
ShardID [ ] * shard . ID
ObjectsHandler func ( context . Context , oid . Address , * objectSDK . Object ) error
2024-02-06 14:34:32 +00:00
TreeHandler func ( context . Context , cid . ID , string , pilorama . Forest ) ( string , error )
2024-02-05 14:48:43 +00:00
IgnoreErrors bool
Async bool
Scope EvacuateScope
2022-09-12 11:48:21 +00:00
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
2024-02-05 14:48:43 +00:00
objEvacuated * atomic . Uint64
objTotal * atomic . Uint64
objFailed * atomic . Uint64
objSkipped * atomic . Uint64
trEvacuated * atomic . Uint64
trTotal * atomic . Uint64
trFailed * atomic . Uint64
2023-05-04 10:58:26 +00:00
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes ( ) * EvacuateShardRes {
return & EvacuateShardRes {
2024-02-05 14:48:43 +00:00
objEvacuated : new ( atomic . Uint64 ) ,
objTotal : new ( atomic . Uint64 ) ,
objFailed : new ( atomic . Uint64 ) ,
objSkipped : new ( atomic . Uint64 ) ,
trEvacuated : new ( atomic . Uint64 ) ,
trTotal : new ( atomic . Uint64 ) ,
trFailed : new ( atomic . Uint64 ) ,
2023-05-04 10:58:26 +00:00
}
2022-09-12 11:48:21 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsEvacuated returns amount of evacuated objects.
2022-09-19 10:31:55 +00:00
// Objects for which handler returned no error are also assumed evacuated.
2024-02-05 14:48:43 +00:00
func ( p * EvacuateShardRes ) ObjectsEvacuated ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objEvacuated . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsTotal returns total count objects to evacuate.
func ( p * EvacuateShardRes ) ObjectsTotal ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objTotal . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsFailed returns count of failed objects to evacuate.
func ( p * EvacuateShardRes ) ObjectsFailed ( ) uint64 {
2023-05-04 10:58:26 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objFailed . Load ( )
2023-05-04 10:58:26 +00:00
}
2024-02-05 14:48:43 +00:00
// ObjectsSkipped returns count of skipped objects.
func ( p * EvacuateShardRes ) ObjectsSkipped ( ) uint64 {
2023-11-02 15:22:58 +00:00
if p == nil {
return 0
}
2024-02-05 14:48:43 +00:00
return p . objSkipped . Load ( )
}
// TreesEvacuated returns amount of evacuated trees.
func ( p * EvacuateShardRes ) TreesEvacuated ( ) uint64 {
if p == nil {
return 0
}
return p . trEvacuated . Load ( )
}
// TreesTotal returns total count trees to evacuate.
func ( p * EvacuateShardRes ) TreesTotal ( ) uint64 {
if p == nil {
return 0
}
return p . trTotal . Load ( )
}
// TreesFailed returns count of failed trees to evacuate.
func ( p * EvacuateShardRes ) TreesFailed ( ) uint64 {
if p == nil {
return 0
}
return p . trFailed . Load ( )
2023-11-02 15:22:58 +00:00
}
2023-05-04 10:58:26 +00:00
// DeepCopy returns deep copy of result instance.
func ( p * EvacuateShardRes ) DeepCopy ( ) * EvacuateShardRes {
if p == nil {
return nil
}
2023-05-19 15:06:20 +00:00
res := & EvacuateShardRes {
2024-02-05 14:48:43 +00:00
objEvacuated : new ( atomic . Uint64 ) ,
objTotal : new ( atomic . Uint64 ) ,
objFailed : new ( atomic . Uint64 ) ,
objSkipped : new ( atomic . Uint64 ) ,
trEvacuated : new ( atomic . Uint64 ) ,
trTotal : new ( atomic . Uint64 ) ,
trFailed : new ( atomic . Uint64 ) ,
2023-05-04 10:58:26 +00:00
}
2023-05-19 15:06:20 +00:00
2024-02-05 14:48:43 +00:00
res . objEvacuated . Store ( p . objEvacuated . Load ( ) )
res . objTotal . Store ( p . objTotal . Load ( ) )
res . objFailed . Store ( p . objFailed . Load ( ) )
res . objSkipped . Store ( p . objSkipped . Load ( ) )
res . trTotal . Store ( p . trTotal . Load ( ) )
res . trEvacuated . Store ( p . trEvacuated . Load ( ) )
res . trFailed . Store ( p . trFailed . Load ( ) )
2023-05-19 15:06:20 +00:00
return res
2022-09-12 11:48:21 +00:00
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util . WorkerPool
}
2022-10-10 17:54:14 +00:00
var errMustHaveTwoShards = errors . New ( "must have at least 1 spare shard" )
2022-09-12 11:48:21 +00:00
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) Evacuate ( ctx context . Context , prm EvacuateShardPrm ) ( * EvacuateShardRes , error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
default :
}
2024-02-05 12:42:30 +00:00
shardIDs := make ( [ ] string , len ( prm . ShardID ) )
for i := range prm . ShardID {
shardIDs [ i ] = prm . ShardID [ i ] . String ( )
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
2023-05-04 10:58:26 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.Evacuate" ,
trace . WithAttributes (
attribute . StringSlice ( "shardIDs" , shardIDs ) ,
2024-02-05 12:42:30 +00:00
attribute . Bool ( "async" , prm . Async ) ,
attribute . Bool ( "ignoreErrors" , prm . IgnoreErrors ) ,
2024-02-05 13:33:09 +00:00
attribute . Stringer ( "scope" , prm . Scope ) ,
2023-05-04 10:58:26 +00:00
) )
defer span . End ( )
2024-02-05 14:48:43 +00:00
shards , weights , err := e . getActualShards ( shardIDs , prm )
2023-03-31 08:33:08 +00:00
if err != nil {
2023-05-04 10:58:26 +00:00
return nil , err
2023-03-31 08:33:08 +00:00
}
shardsToEvacuate := make ( map [ string ] * shard . Shard )
for i := range shardIDs {
for j := range shards {
if shards [ j ] . ID ( ) . String ( ) == shardIDs [ i ] {
shardsToEvacuate [ shardIDs [ i ] ] = shards [ j ] . Shard
}
}
}
2023-05-04 10:58:26 +00:00
res := NewEvacuateShardRes ( )
2024-02-05 12:42:30 +00:00
ctx = ctxOrBackground ( ctx , prm . Async )
2023-05-04 10:58:26 +00:00
eg , egCtx , err := e . evacuateLimiter . TryStart ( ctx , shardIDs , res )
if err != nil {
return nil , err
}
eg . Go ( func ( ) error {
return e . evacuateShards ( egCtx , shardIDs , prm , res , shards , weights , shardsToEvacuate )
} )
2024-02-05 12:42:30 +00:00
if prm . Async {
2023-05-04 10:58:26 +00:00
return nil , nil
}
return res , eg . Wait ( )
}
func ctxOrBackground ( ctx context . Context , background bool ) context . Context {
if background {
return context . Background ( )
}
return ctx
}
func ( e * StorageEngine ) evacuateShards ( ctx context . Context , shardIDs [ ] string , prm EvacuateShardPrm , res * EvacuateShardRes ,
2023-10-31 11:56:55 +00:00
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) error {
2023-05-04 10:58:26 +00:00
var err error
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateShards" ,
trace . WithAttributes (
attribute . StringSlice ( "shardIDs" , shardIDs ) ,
2024-02-05 12:42:30 +00:00
attribute . Bool ( "async" , prm . Async ) ,
attribute . Bool ( "ignoreErrors" , prm . IgnoreErrors ) ,
2024-02-05 14:48:43 +00:00
attribute . Stringer ( "scope" , prm . Scope ) ,
2023-05-04 10:58:26 +00:00
) )
defer func ( ) {
span . End ( )
e . evacuateLimiter . Complete ( err )
} ( )
2023-09-27 08:02:06 +00:00
e . log . Info ( logs . EngineStartedShardsEvacuation , zap . Strings ( "shard_ids" , shardIDs ) , evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
2023-03-31 08:33:08 +00:00
2024-02-06 10:59:50 +00:00
err = e . getTotals ( ctx , prm , shardsToEvacuate , res )
2023-05-04 10:58:26 +00:00
if err != nil {
2023-09-27 08:02:06 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToCount , zap . Strings ( "shard_ids" , shardIDs ) , zap . Error ( err ) , evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
2023-05-04 10:58:26 +00:00
return err
}
2023-03-31 08:33:08 +00:00
for _ , shardID := range shardIDs {
2023-05-04 10:58:26 +00:00
if err = e . evacuateShard ( ctx , shardID , prm , res , shards , weights , shardsToEvacuate ) ; err != nil {
2024-02-05 14:48:43 +00:00
e . log . Error ( logs . EngineFinishedWithErrorShardsEvacuation , zap . Error ( err ) , zap . Strings ( "shard_ids" , shardIDs ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) , zap . Stringer ( "scope" , prm . Scope ) )
2023-05-04 10:58:26 +00:00
return err
2023-03-31 08:33:08 +00:00
}
}
2023-05-31 10:24:30 +00:00
e . log . Info ( logs . EngineFinishedSuccessfullyShardsEvacuation ,
zap . Strings ( "shard_ids" , shardIDs ) ,
evacuationOperationLogField ,
2024-02-05 14:48:43 +00:00
zap . Uint64 ( "total_objects" , res . ObjectsTotal ( ) ) ,
zap . Uint64 ( "evacuated_objects" , res . ObjectsEvacuated ( ) ) ,
zap . Uint64 ( "failed_objects" , res . ObjectsFailed ( ) ) ,
zap . Uint64 ( "skipped_objects" , res . ObjectsSkipped ( ) ) ,
zap . Uint64 ( "total_trees" , res . TreesTotal ( ) ) ,
zap . Uint64 ( "evacuated_trees" , res . TreesEvacuated ( ) ) ,
zap . Uint64 ( "failed_trees" , res . TreesFailed ( ) ) ,
2023-05-31 10:24:30 +00:00
)
2023-05-04 10:58:26 +00:00
return nil
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) getTotals ( ctx context . Context , prm EvacuateShardPrm , shardsToEvacuate map [ string ] * shard . Shard , res * EvacuateShardRes ) error {
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.getTotals" )
2023-05-04 10:58:26 +00:00
defer span . End ( )
for _ , sh := range shardsToEvacuate {
2024-02-06 10:59:50 +00:00
if prm . Scope . WithObjects ( ) {
cnt , err := sh . LogicalObjectsCount ( ctx )
if err != nil {
if errors . Is ( err , shard . ErrDegradedMode ) {
continue
}
return err
2023-05-04 10:58:26 +00:00
}
2024-02-06 10:59:50 +00:00
res . objTotal . Add ( cnt )
}
if prm . Scope . WithTrees ( ) && sh . PiloramaEnabled ( ) {
cnt , err := pilorama . TreeCountAll ( ctx , sh )
if err != nil {
return err
}
res . trTotal . Add ( cnt )
2023-05-04 10:58:26 +00:00
}
}
return nil
2023-03-31 08:33:08 +00:00
}
2023-03-13 11:37:35 +00:00
func ( e * StorageEngine ) evacuateShard ( ctx context . Context , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
2023-10-31 11:56:55 +00:00
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) error {
2023-05-04 10:58:26 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateShard" ,
trace . WithAttributes (
attribute . String ( "shardID" , shardID ) ,
) )
defer span . End ( )
2024-02-06 10:59:50 +00:00
if prm . Scope . WithObjects ( ) {
if err := e . evacuateShardObjects ( ctx , shardID , prm , res , shards , weights , shardsToEvacuate ) ; err != nil {
return err
}
}
if prm . Scope . WithTrees ( ) && shardsToEvacuate [ shardID ] . PiloramaEnabled ( ) {
if err := e . evacuateShardTrees ( ctx , shardID , prm , res , shards , weights , shardsToEvacuate ) ; err != nil {
return err
}
}
return nil
}
func ( e * StorageEngine ) evacuateShardObjects ( ctx context . Context , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) error {
2023-03-31 08:33:08 +00:00
var listPrm shard . ListWithCursorPrm
listPrm . WithCount ( defaultEvacuateBatchSize )
sh := shardsToEvacuate [ shardID ]
var c * meta . Cursor
for {
listPrm . WithCursor ( c )
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
2023-06-06 09:27:19 +00:00
listRes , err := sh . ListWithCursor ( ctx , listPrm )
2023-03-31 08:33:08 +00:00
if err != nil {
if errors . Is ( err , meta . ErrEndOfListing ) || errors . Is ( err , shard . ErrDegradedMode ) {
break
}
2023-09-27 08:02:06 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToListObjects , zap . String ( "shard_id" , shardID ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-03-31 08:33:08 +00:00
return err
}
2023-03-13 11:37:35 +00:00
if err = e . evacuateObjects ( ctx , sh , listRes . AddressList ( ) , prm , res , shards , weights , shardsToEvacuate ) ; err != nil {
2023-03-31 08:33:08 +00:00
return err
}
c = listRes . Cursor ( )
}
return nil
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) evacuateShardTrees ( ctx context . Context , shardID string , prm EvacuateShardPrm , res * EvacuateShardRes ,
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) error {
sh := shardsToEvacuate [ shardID ]
var listPrm pilorama . TreeListTreesPrm
first := true
for len ( listPrm . NextPageToken ) > 0 || first {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
first = false
listRes , err := sh . TreeListTrees ( ctx , listPrm )
if err != nil {
return err
}
listPrm . NextPageToken = listRes . NextPageToken
if err := e . evacuateTrees ( ctx , sh , listRes . Items , prm , res , shards , weights , shardsToEvacuate ) ; err != nil {
return err
}
}
return nil
}
func ( e * StorageEngine ) evacuateTrees ( ctx context . Context , sh * shard . Shard , trees [ ] pilorama . ContainerIDTreeID ,
prm EvacuateShardPrm , res * EvacuateShardRes , shards [ ] pooledShard , weights [ ] float64 ,
shardsToEvacuate map [ string ] * shard . Shard ,
) error {
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateTrees" ,
trace . WithAttributes (
attribute . Int ( "trees_count" , len ( trees ) ) ,
) )
defer span . End ( )
for _ , contTree := range trees {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2024-02-06 14:34:32 +00:00
success , shardID , err := e . tryEvacuateTreeLocal ( ctx , sh , contTree , prm , shards , weights , shardsToEvacuate )
2024-02-06 10:59:50 +00:00
if err != nil {
return err
}
if success {
2024-02-06 14:34:32 +00:00
e . log . Debug ( logs . EngineShardsEvacuationTreeEvacuatedLocal ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , zap . String ( "to_shard_id" , shardID ) ,
evacuationOperationLogField , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2024-02-06 10:59:50 +00:00
res . trEvacuated . Add ( 1 )
2024-02-06 14:34:32 +00:00
continue
}
nodePK , err := e . evacuateTreeToOtherNode ( ctx , sh , contTree , prm )
if err != nil {
e . log . Error ( logs . EngineShardsEvacuationFailedToMoveTree ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "tree_id" , contTree . TreeID ) ,
zap . String ( "from_shard_id" , sh . ID ( ) . String ( ) ) , evacuationOperationLogField ,
zap . Error ( err ) , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
return err
2024-02-06 10:59:50 +00:00
}
2024-02-06 14:34:32 +00:00
e . log . Debug ( logs . EngineShardsEvacuationTreeEvacuatedRemote ,
zap . String ( "cid" , contTree . CID . EncodeToString ( ) ) , zap . String ( "treeID" , contTree . TreeID ) ,
zap . String ( "from_shardID" , sh . ID ( ) . String ( ) ) , zap . String ( "to_node" , nodePK ) ,
evacuationOperationLogField , zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
res . trEvacuated . Add ( 1 )
2024-02-06 10:59:50 +00:00
}
return nil
}
2024-02-06 14:34:32 +00:00
func ( e * StorageEngine ) evacuateTreeToOtherNode ( ctx context . Context , sh * shard . Shard , tree pilorama . ContainerIDTreeID , prm EvacuateShardPrm ) ( string , error ) {
if prm . TreeHandler == nil {
return "" , fmt . Errorf ( "failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available" , tree . TreeID , tree . CID , sh . ID ( ) )
}
return prm . TreeHandler ( ctx , tree . CID , tree . TreeID , sh )
}
2024-02-06 10:59:50 +00:00
func ( e * StorageEngine ) tryEvacuateTreeLocal ( ctx context . Context , sh * shard . Shard , tree pilorama . ContainerIDTreeID ,
prm EvacuateShardPrm , shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) ( bool , string , error ) {
target , found , err := e . findShardToEvacuateTree ( ctx , tree , shards , weights , shardsToEvacuate )
if err != nil {
return false , "" , err
}
if ! found {
return false , "" , nil
}
const readBatchSize = 1000
source := make ( chan * pilorama . Move , readBatchSize )
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
var wg sync . WaitGroup
wg . Add ( 1 )
var applyErr error
go func ( ) {
defer wg . Done ( )
applyErr = target . TreeApplyStream ( ctx , tree . CID , tree . TreeID , source )
if applyErr != nil {
cancel ( )
}
} ( )
var height uint64
for {
op , err := sh . TreeGetOpLog ( ctx , tree . CID , tree . TreeID , height )
if err != nil {
cancel ( )
wg . Wait ( )
close ( source ) // close after cancel to ctx.Done() hits first
if prm . IgnoreErrors {
return false , "" , nil
}
return false , "" , err
}
if op . Time == 0 { // completed get op log
close ( source )
wg . Wait ( )
if applyErr == nil {
return true , target . ID ( ) . String ( ) , nil
}
if prm . IgnoreErrors {
return false , "" , nil
}
return false , "" , applyErr
}
select {
case <- ctx . Done ( ) : // apply stream failed or operation cancelled
wg . Wait ( )
if prm . IgnoreErrors {
return false , "" , nil
}
if applyErr != nil {
return false , "" , applyErr
}
return false , "" , ctx . Err ( )
case source <- & op :
}
height = op . Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func ( e * StorageEngine ) findShardToEvacuateTree ( ctx context . Context , tree pilorama . ContainerIDTreeID ,
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) ( pooledShard , bool , error ) {
hrw . SortHasherSliceByWeightValue ( shards , weights , hrw . StringHash ( tree . CID . EncodeToString ( ) ) )
var result pooledShard
var found bool
for _ , target := range shards {
select {
case <- ctx . Done ( ) :
return pooledShard { } , false , ctx . Err ( )
default :
}
if _ , ok := shardsToEvacuate [ target . ID ( ) . String ( ) ] ; ok {
continue
}
if ! target . PiloramaEnabled ( ) || target . GetMode ( ) . ReadOnly ( ) {
continue
}
if ! found {
result = target
found = true
}
exists , err := target . TreeExists ( ctx , tree . CID , tree . TreeID )
if err != nil {
continue
}
if exists {
return target , true , nil
}
}
return result , found , nil
}
2024-02-05 14:48:43 +00:00
func ( e * StorageEngine ) getActualShards ( shardIDs [ ] string , prm EvacuateShardPrm ) ( [ ] pooledShard , [ ] float64 , error ) {
2022-09-12 11:48:21 +00:00
e . mtx . RLock ( )
2023-03-31 08:33:08 +00:00
defer e . mtx . RUnlock ( )
for i := range shardIDs {
sh , ok := e . shards [ shardIDs [ i ] ]
2022-10-10 17:54:14 +00:00
if ! ok {
2023-03-31 08:33:08 +00:00
return nil , nil , errShardNotFound
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
2022-10-10 17:54:14 +00:00
if ! sh . GetMode ( ) . ReadOnly ( ) {
2023-04-14 06:38:29 +00:00
return nil , nil , ErrMustBeReadOnly
2022-10-10 17:54:14 +00:00
}
2024-02-06 10:59:50 +00:00
if prm . Scope . TreesOnly ( ) && ! sh . PiloramaEnabled ( ) {
return nil , nil , fmt . Errorf ( "shard %s doesn't have pilorama enabled" , sh . ID ( ) )
}
2022-09-12 11:48:21 +00:00
}
2024-02-06 10:59:50 +00:00
if len ( e . shards ) - len ( shardIDs ) < 1 && prm . ObjectsHandler == nil && prm . Scope . WithObjects ( ) {
2023-03-31 08:33:08 +00:00
return nil , nil , errMustHaveTwoShards
2022-09-12 11:48:21 +00:00
}
2024-02-06 14:34:32 +00:00
if len ( e . shards ) - len ( shardIDs ) < 1 && prm . TreeHandler == nil && prm . Scope . WithTrees ( ) {
return nil , nil , errMustHaveTwoShards
}
2022-09-12 11:48:21 +00:00
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make ( [ ] pooledShard , 0 , len ( e . shards ) )
for id := range e . shards {
shards = append ( shards , pooledShard {
hashedShard : hashedShard ( e . shards [ id ] ) ,
pool : e . shardPools [ id ] ,
} )
}
weights := make ( [ ] float64 , 0 , len ( shards ) )
for i := range shards {
weights = append ( weights , e . shardWeight ( shards [ i ] . Shard ) )
}
2023-03-31 08:33:08 +00:00
return shards , weights , nil
}
2023-03-13 11:37:35 +00:00
func ( e * StorageEngine ) evacuateObjects ( ctx context . Context , sh * shard . Shard , toEvacuate [ ] object . AddressWithType , prm EvacuateShardPrm , res * EvacuateShardRes ,
2023-10-31 11:56:55 +00:00
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard ,
) error {
2023-05-04 10:58:26 +00:00
ctx , span := tracing . StartSpanFromContext ( ctx , "StorageEngine.evacuateObjects" ,
trace . WithAttributes (
attribute . Int ( "objects_count" , len ( toEvacuate ) ) ,
) )
defer span . End ( )
2023-03-31 08:33:08 +00:00
for i := range toEvacuate {
2023-05-02 11:16:13 +00:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2023-03-31 08:33:08 +00:00
addr := toEvacuate [ i ] . Address
var getPrm shard . GetPrm
getPrm . SetAddress ( addr )
2023-03-13 11:37:35 +00:00
getRes , err := sh . Get ( ctx , getPrm )
2023-03-31 08:33:08 +00:00
if err != nil {
2024-02-05 12:42:30 +00:00
if prm . IgnoreErrors {
2024-02-05 14:48:43 +00:00
res . objFailed . Add ( 1 )
2023-03-31 08:33:08 +00:00
continue
2022-10-10 17:54:14 +00:00
}
2023-09-27 08:02:06 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToReadObject , zap . String ( "address" , addr . EncodeToString ( ) ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-03-31 08:33:08 +00:00
return err
2022-10-10 17:54:14 +00:00
}
2023-05-04 10:58:26 +00:00
evacuatedLocal , err := e . tryEvacuateObjectLocal ( ctx , addr , getRes . Object ( ) , sh , shards , weights , shardsToEvacuate , res )
2023-05-02 11:16:13 +00:00
if err != nil {
return err
}
if evacuatedLocal {
2023-03-31 08:33:08 +00:00
continue
}
2022-09-12 11:48:21 +00:00
2024-02-05 14:48:43 +00:00
if prm . ObjectsHandler == nil {
2023-03-31 08:33:08 +00:00
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return fmt . Errorf ( "%w: %s" , errPutShard , toEvacuate [ i ] )
}
2022-09-12 11:48:21 +00:00
2024-02-05 14:48:43 +00:00
err = prm . ObjectsHandler ( ctx , addr , getRes . Object ( ) )
2023-03-31 08:33:08 +00:00
if err != nil {
2023-09-27 08:02:06 +00:00
e . log . Error ( logs . EngineShardsEvacuationFailedToMoveObject , zap . String ( "address" , addr . EncodeToString ( ) ) , zap . Error ( err ) , evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-03-31 08:33:08 +00:00
return err
}
2024-02-05 14:48:43 +00:00
res . objEvacuated . Add ( 1 )
2023-03-31 08:33:08 +00:00
}
return nil
}
2022-09-12 11:48:21 +00:00
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) tryEvacuateObjectLocal ( ctx context . Context , addr oid . Address , object * objectSDK . Object , sh * shard . Shard ,
2023-10-31 11:56:55 +00:00
shards [ ] pooledShard , weights [ ] float64 , shardsToEvacuate map [ string ] * shard . Shard , res * EvacuateShardRes ,
) ( bool , error ) {
2023-06-02 12:39:16 +00:00
hrw . SortHasherSliceByWeightValue ( shards , weights , hrw . StringHash ( addr . EncodeToString ( ) ) )
2023-03-31 08:33:08 +00:00
for j := range shards {
2023-05-02 11:16:13 +00:00
select {
case <- ctx . Done ( ) :
return false , ctx . Err ( )
default :
}
2023-03-31 08:33:08 +00:00
if _ , ok := shardsToEvacuate [ shards [ j ] . ID ( ) . String ( ) ] ; ok {
continue
}
2023-11-21 12:06:54 +00:00
switch e . putToShard ( ctx , shards [ j ] . hashedShard , j , shards [ j ] . pool , addr , object ) . status {
case putToShardSuccess :
2024-02-05 14:48:43 +00:00
res . objEvacuated . Add ( 1 )
2023-11-21 12:06:54 +00:00
e . log . Debug ( logs . EngineObjectIsMovedToAnotherShard ,
zap . Stringer ( "from" , sh . ID ( ) ) ,
zap . Stringer ( "to" , shards [ j ] . ID ( ) ) ,
zap . Stringer ( "addr" , addr ) ,
evacuationOperationLogField ,
zap . String ( "trace_id" , tracingPkg . GetTraceID ( ctx ) ) )
2023-05-02 11:16:13 +00:00
return true , nil
2023-11-21 12:06:54 +00:00
case putToShardExists , putToShardRemoved :
2024-02-05 14:48:43 +00:00
res . objSkipped . Add ( 1 )
2023-11-21 12:06:54 +00:00
return true , nil
default :
continue
2022-10-10 17:54:14 +00:00
}
2022-09-12 11:48:21 +00:00
}
2022-10-10 17:54:14 +00:00
2023-05-02 11:16:13 +00:00
return false , nil
2022-09-12 11:48:21 +00:00
}
2023-05-04 10:58:26 +00:00
func ( e * StorageEngine ) GetEvacuationState ( ctx context . Context ) ( * EvacuationState , error ) {
select {
case <- ctx . Done ( ) :
return nil , ctx . Err ( )
default :
}
return e . evacuateLimiter . GetState ( ) , nil
}
func ( e * StorageEngine ) EnqueRunningEvacuationStop ( ctx context . Context ) error {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
return e . evacuateLimiter . CancelIfRunning ( )
}