Pilorama migration #960

Merged
fyrchik merged 8 commits from dstepanov-yadro/frostfs-node:feat/pilorama_migrate into master 2024-09-04 19:51:06 +00:00
20 changed files with 1973 additions and 877 deletions

View file

@ -19,6 +19,11 @@ import (
const ( const (
awaitFlag = "await" awaitFlag = "await"
noProgressFlag = "no-progress" noProgressFlag = "no-progress"
scopeFlag = "scope"
scopeAll = "all"
scopeObjects = "objects"
scopeTrees = "trees"
) )
var evacuationShardCmd = &cobra.Command{ var evacuationShardCmd = &cobra.Command{
@ -57,6 +62,7 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
Body: &control.StartShardEvacuationRequest_Body{ Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Scope: getEvacuationScope(cmd),
}, },
} }
@ -82,6 +88,22 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
} }
} }
func getEvacuationScope(cmd *cobra.Command) uint32 {
rawScope, err := cmd.Flags().GetString(scopeFlag)
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", err)
switch rawScope {
case scopeAll:
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS) | uint32(control.StartShardEvacuationRequest_Body_TREES)
case scopeObjects:
return uint32(control.StartShardEvacuationRequest_Body_OBJECTS)
case scopeTrees:
return uint32(control.StartShardEvacuationRequest_Body_TREES)
default:
commonCmd.ExitOnErr(cmd, "Invalid scope value: %w", fmt.Errorf("unknown scope %s", rawScope))
}
return uint32(control.StartShardEvacuationRequest_Body_NONE)
}
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) { func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
req := &control.GetShardEvacuationStatusRequest{ req := &control.GetShardEvacuationStatusRequest{
@ -219,15 +241,17 @@ func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusRespo
func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { func appendEstimation(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING || if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING ||
resp.GetBody().GetDuration() == nil || resp.GetBody().GetDuration() == nil ||
resp.GetBody().GetTotal() == 0 || (resp.GetBody().GetTotalObjects() == 0 && resp.GetBody().GetTotalTrees() == 0) ||
resp.GetBody().GetEvacuated()+resp.GetBody().GetFailed()+resp.GetBody().GetSkipped() == 0 { (resp.GetBody().GetEvacuatedObjects()+resp.GetBody().GetFailedObjects()+resp.GetBody().GetSkippedObjects() == 0 &&
resp.GetBody().GetEvacuatedTrees()+resp.GetBody().GetFailedTrees() == 0) {
return return
} }
durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds()) durationSeconds := float64(resp.GetBody().GetDuration().GetSeconds())
evacuated := float64(resp.GetBody().GetEvacuated() + resp.GetBody().GetFailed() + resp.GetBody().GetSkipped()) evacuated := float64(resp.GetBody().GetEvacuatedObjects() + resp.GetBody().GetFailedObjects() + resp.GetBody().GetSkippedObjects() +
resp.GetBody().GetEvacuatedTrees() + resp.GetBody().GetFailedTrees())
avgObjEvacuationTimeSeconds := durationSeconds / evacuated avgObjEvacuationTimeSeconds := durationSeconds / evacuated
objectsLeft := float64(resp.GetBody().GetTotal()) - evacuated objectsLeft := float64(resp.GetBody().GetTotalObjects()+resp.GetBody().GetTotalTrees()) - evacuated
leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft leftSeconds := avgObjEvacuationTimeSeconds * objectsLeft
leftMinutes := int(leftSeconds / 60) leftMinutes := int(leftSeconds / 60)
@ -285,11 +309,14 @@ func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusR
} }
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) { func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d.", sb.WriteString(fmt.Sprintf(" Evacuated %d objects out of %d, failed to evacuate: %d, skipped: %d; evacuated %d trees out of %d, failed to evacuate: %d.",
resp.GetBody().GetEvacuated(), resp.GetBody().GetEvacuatedObjects(),
resp.GetBody().GetTotal(), resp.GetBody().GetTotalObjects(),
resp.GetBody().GetFailed(), resp.GetBody().GetFailedObjects(),
resp.GetBody().GetSkipped())) resp.GetBody().GetSkippedObjects(),
resp.GetBody().GetEvacuatedTrees(),
resp.GetBody().GetTotalTrees(),
resp.GetBody().GetFailedTrees()))
} }
func initControlEvacuationShardCmd() { func initControlEvacuationShardCmd() {
@ -309,6 +336,7 @@ func initControlStartEvacuationShardCmd() {
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards") flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed") flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag)) flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))

View file

@ -9,23 +9,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server" controlSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
const serviceNameControl = "control" const serviceNameControl = "control"
type treeSynchronizer struct {
treeSvc *tree.Service
}
func (t treeSynchronizer) Synchronize(ctx context.Context, cnr cid.ID, treeID string) error {
return t.treeSvc.SynchronizeTree(ctx, cnr, treeID)
}
func initControlService(c *cfg) { func initControlService(c *cfg) {
endpoint := controlconfig.GRPC(c.appCfg).Endpoint() endpoint := controlconfig.GRPC(c.appCfg).Endpoint()
if endpoint == controlconfig.GRPCEndpointDefault { if endpoint == controlconfig.GRPCEndpointDefault {
@ -50,9 +40,7 @@ func initControlService(c *cfg) {
controlSvc.WithReplicator(c.replicator), controlSvc.WithReplicator(c.replicator),
controlSvc.WithNodeState(c), controlSvc.WithNodeState(c),
controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage), controlSvc.WithLocalStorage(c.cfgObject.cfgLocalStorage.localStorage),
controlSvc.WithTreeService(treeSynchronizer{ controlSvc.WithTreeService(c.treeService),
c.treeService,
}),
controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine),
) )

View file

@ -14,7 +14,7 @@ Only one running evacuation process is allowed on the node at a time.
## Commands ## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. `frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag. By default, objects and trees are evacuated. To limit the evacuation scope, use `--scope` flag (possible values are `all`, `trees`, `objects`).
`frostfs-cli control shards evacuation stop` stops running evacuation process. `frostfs-cli control shards evacuation stop` stops running evacuation process.
@ -39,15 +39,15 @@ Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03. Estimated time left: 2 minutes. Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03. Estimated time left: 2 minutes.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:01:05. Estimated time left: 1 minutes. Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 260 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:01:05. Estimated time left: 1 minutes.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:02:13. Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:02:13.
``` ```
### Stop running evacuation process ### Stop running evacuation process
@ -58,7 +58,7 @@ Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03. Estimated time left: 0 minutes. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03. Estimated time left: 0 minutes.
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
@ -66,7 +66,7 @@ Evacuation stopped.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password > Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
``` ```
### Start evacuation and await it completes ### Start evacuation and await it completes
@ -75,11 +75,11 @@ frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 -
Enter password > Enter password >
Shard evacuation has been successfully started. Shard evacuation has been successfully started.
Progress will be reported every 5 seconds. Progress will be reported every 5 seconds.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04. Estimated time left: 0 minutes. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 131 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09. Estimated time left: 0 minutes. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 343 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09. Estimated time left: 0 minutes.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14. Estimated time left: 0 minutes. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 545 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 0 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14. Estimated time left: 0 minutes.
Shard evacuation has been completed. Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
``` ```
### Start evacuation and await it completes without progress notifications ### Start evacuation and await it completes without progress notifications
@ -88,5 +88,15 @@ frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 -
Enter password > Enter password >
Shard evacuation has been successfully started. Shard evacuation has been successfully started.
Shard evacuation has been completed. Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14. Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 618 objects out of 618, failed to evacuate: 0, skipped: 0; evacuated 19 trees out of 19, failed to evacuate: 0. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
``` ```
### Start trees evacuation and await it completes
```bash
frostfs-cli control shards evacuation start --id FxR6QujButNCHn7jjdhxGP --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --await --scope trees
Enter password >
Shard evacuation has been successfully started.
Progress will be reported every 5 seconds.
Shard evacuation has been completed.
Shard IDs: FxR6QujButNCHn7jjdhxGP. Evacuated 0 objects out of 0, failed to evacuate: 0, skipped: 0; evacuated 2 trees out of 2, failed to evacuate: 0. Started at: 2024-02-08T08:44:17Z UTC. Duration: 00:00:00.
```

View file

@ -575,4 +575,7 @@ const (
GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node" GetSvcV2FailedToGetRangeHashFromNode = "failed to get range hash from node"
GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes" GetSvcV2FailedToGetRangeHashFromAllOfContainerNodes = "failed to get range hash from all of container nodes"
FailedToUpdateShardID = "failed to update shard id" FailedToUpdateShardID = "failed to update shard id"
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
) )

View file

@ -4,16 +4,20 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync"
"sync/atomic" "sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing" tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw" "git.frostfs.info/TrueCloudLab/hrw"
@ -28,83 +32,135 @@ var (
evacuationOperationLogField = zap.String("operation", "evacuation") evacuationOperationLogField = zap.String("operation", "evacuation")
) )
// EvacuateScope is an evacuation scope. Keep in sync with pkg/services/control/service.proto.
type EvacuateScope uint32
var (
EvacuateScopeObjects EvacuateScope = 1
EvacuateScopeTrees EvacuateScope = 2
)
func (s EvacuateScope) String() string {
var sb strings.Builder
first := true
if s&EvacuateScopeObjects == EvacuateScopeObjects {
if !first {
sb.WriteString(";")
}
sb.WriteString("objects")
first = false
}
if s&EvacuateScopeTrees == EvacuateScopeTrees {
if !first {
sb.WriteString(";")
}
sb.WriteString("trees")
}
return sb.String()
}
func (s EvacuateScope) WithObjects() bool {
return s&EvacuateScopeObjects == EvacuateScopeObjects
}
func (s EvacuateScope) WithTrees() bool {
return s&EvacuateScopeTrees == EvacuateScopeTrees
}
func (s EvacuateScope) TreesOnly() bool {
return s == EvacuateScopeTrees
}
// EvacuateShardPrm represents parameters for the EvacuateShard operation. // EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct { type EvacuateShardPrm struct {
shardID []*shard.ID ShardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error ObjectsHandler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool TreeHandler func(context.Context, cid.ID, string, pilorama.Forest) (string, error)
async bool IgnoreErrors bool
Async bool
Scope EvacuateScope
} }
// EvacuateShardRes represents result of the EvacuateShard operation. // EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct { type EvacuateShardRes struct {
evacuated *atomic.Uint64 objEvacuated *atomic.Uint64
total *atomic.Uint64 objTotal *atomic.Uint64
failed *atomic.Uint64 objFailed *atomic.Uint64
skipped *atomic.Uint64 objSkipped *atomic.Uint64
trEvacuated *atomic.Uint64
trTotal *atomic.Uint64
trFailed *atomic.Uint64
} }
// NewEvacuateShardRes creates new EvacuateShardRes instance. // NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes { func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{ return &EvacuateShardRes{
evacuated: new(atomic.Uint64), objEvacuated: new(atomic.Uint64),
total: new(atomic.Uint64), objTotal: new(atomic.Uint64),
failed: new(atomic.Uint64), objFailed: new(atomic.Uint64),
skipped: new(atomic.Uint64), objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
} }
} }
// WithShardIDList sets shard ID. // ObjectsEvacuated returns amount of evacuated objects.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
p.shardID = id
}
// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address, *objectSDK.Object) error) {
p.handler = f
}
// WithAsync sets flag to run evacuate async.
func (p *EvacuateShardPrm) WithAsync(async bool) {
p.async = async
}
// Evacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated. // Objects for which handler returned no error are also assumed evacuated.
func (p *EvacuateShardRes) Evacuated() uint64 { func (p *EvacuateShardRes) ObjectsEvacuated() uint64 {
if p == nil { if p == nil {
return 0 return 0
} }
return p.evacuated.Load() return p.objEvacuated.Load()
} }
// Total returns total count objects to evacuate. // ObjectsTotal returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 { func (p *EvacuateShardRes) ObjectsTotal() uint64 {
if p == nil { if p == nil {
return 0 return 0
} }
return p.total.Load() return p.objTotal.Load()
} }
// Failed returns count of failed objects to evacuate. // ObjectsFailed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 { func (p *EvacuateShardRes) ObjectsFailed() uint64 {
if p == nil { if p == nil {
return 0 return 0
} }
return p.failed.Load() return p.objFailed.Load()
} }
// Skipped returns count of skipped objects. // ObjectsSkipped returns count of skipped objects.
func (p *EvacuateShardRes) Skipped() uint64 { func (p *EvacuateShardRes) ObjectsSkipped() uint64 {
if p == nil { if p == nil {
return 0 return 0
} }
return p.skipped.Load() return p.objSkipped.Load()
}
// TreesEvacuated returns amount of evacuated trees.
func (p *EvacuateShardRes) TreesEvacuated() uint64 {
if p == nil {
return 0
}
return p.trEvacuated.Load()
}
// TreesTotal returns total count trees to evacuate.
func (p *EvacuateShardRes) TreesTotal() uint64 {
if p == nil {
return 0
}
return p.trTotal.Load()
}
// TreesFailed returns count of failed trees to evacuate.
func (p *EvacuateShardRes) TreesFailed() uint64 {
if p == nil {
return 0
}
return p.trFailed.Load()
} }
// DeepCopy returns deep copy of result instance. // DeepCopy returns deep copy of result instance.
@ -114,16 +170,22 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
} }
res := &EvacuateShardRes{ res := &EvacuateShardRes{
evacuated: new(atomic.Uint64), objEvacuated: new(atomic.Uint64),
total: new(atomic.Uint64), objTotal: new(atomic.Uint64),
failed: new(atomic.Uint64), objFailed: new(atomic.Uint64),
skipped: new(atomic.Uint64), objSkipped: new(atomic.Uint64),
trEvacuated: new(atomic.Uint64),
trTotal: new(atomic.Uint64),
trFailed: new(atomic.Uint64),
} }
res.evacuated.Store(p.evacuated.Load()) res.objEvacuated.Store(p.objEvacuated.Load())
res.total.Store(p.total.Load()) res.objTotal.Store(p.objTotal.Load())
res.failed.Store(p.failed.Load()) res.objFailed.Store(p.objFailed.Load())
res.skipped.Store(p.skipped.Load()) res.objSkipped.Store(p.objSkipped.Load())
res.trTotal.Store(p.trTotal.Load())
res.trEvacuated.Store(p.trEvacuated.Load())
res.trFailed.Store(p.trFailed.Load())
return res return res
} }
@ -145,20 +207,21 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
default: default:
} }
shardIDs := make([]string, len(prm.shardID)) shardIDs := make([]string, len(prm.ShardID))
for i := range prm.shardID { for i := range prm.ShardID {
shardIDs[i] = prm.shardID[i].String() shardIDs[i] = prm.ShardID[i].String()
} }
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async), attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.ignoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
)) ))
defer span.End() defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil) shards, weights, err := e.getActualShards(shardIDs, prm)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -173,7 +236,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
} }
res := NewEvacuateShardRes() res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.async) ctx = ctxOrBackground(ctx, prm.Async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res) eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil { if err != nil {
return nil, err return nil, err
@ -183,7 +246,7 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate) return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
}) })
if prm.async { if prm.Async {
return nil, nil return nil, nil
} }
@ -204,8 +267,9 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes( trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs), attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async), attribute.Bool("async", prm.Async),
attribute.Bool("ignoreErrors", prm.ignoreErrors), attribute.Bool("ignoreErrors", prm.IgnoreErrors),
attribute.Stringer("scope", prm.Scope),
)) ))
defer func() { defer func() {
@ -214,18 +278,19 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
}() }()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res) err = e.getTotals(ctx, prm, shardsToEvacuate, res)
if err != nil { if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err return err
} }
for _, shardID := range shardIDs { for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil { if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField) e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err return err
} }
} }
@ -233,27 +298,39 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
zap.Strings("shard_ids", shardIDs), zap.Strings("shard_ids", shardIDs),
evacuationOperationLogField, evacuationOperationLogField,
zap.Uint64("total", res.Total()), zap.Uint64("total_objects", res.ObjectsTotal()),
zap.Uint64("evacuated", res.Evacuated()), zap.Uint64("evacuated_objects", res.ObjectsEvacuated()),
zap.Uint64("failed", res.Failed()), zap.Uint64("failed_objects", res.ObjectsFailed()),
zap.Uint64("skipped", res.Skipped()), zap.Uint64("skipped_objects", res.ObjectsSkipped()),
zap.Uint64("total_trees", res.TreesTotal()),
zap.Uint64("evacuated_trees", res.TreesEvacuated()),
zap.Uint64("failed_trees", res.TreesFailed()),
) )
return nil return nil
} }
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error { func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount") ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotals")
defer span.End() defer span.End()
for _, sh := range shardsToEvacuate { for _, sh := range shardsToEvacuate {
cnt, err := sh.LogicalObjectsCount(ctx) if prm.Scope.WithObjects() {
if err != nil { cnt, err := sh.LogicalObjectsCount(ctx)
if errors.Is(err, shard.ErrDegradedMode) { if err != nil {
continue if errors.Is(err, shard.ErrDegradedMode) {
continue
}
return err
} }
return err res.objTotal.Add(cnt)
}
if prm.Scope.WithTrees() && sh.PiloramaEnabled() {
cnt, err := pilorama.TreeCountAll(ctx, sh)
if err != nil {
return err
}
res.trTotal.Add(cnt)
} }
res.total.Add(cnt)
} }
return nil return nil
} }
@ -267,6 +344,24 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
)) ))
defer span.End() defer span.End()
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
var listPrm shard.ListWithCursorPrm var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize) listPrm.WithCount(defaultEvacuateBatchSize)
@ -297,7 +392,198 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
return nil return nil
} }
func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool) ([]pooledShard, []float64, error) { func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) error {
sh := shardsToEvacuate[shardID]
var listPrm pilorama.TreeListTreesPrm
first := true
for len(listPrm.NextPageToken) > 0 || first {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
first = false
listRes, err := sh.TreeListTrees(ctx, listPrm)
if err != nil {
return err
}
listPrm.NextPageToken = listRes.NextPageToken
if err := e.evacuateTrees(ctx, sh, listRes.Items, prm, res, shards, weights, shardsToEvacuate); err != nil {
return err
}
}
return nil
}
func (e *StorageEngine) evacuateTrees(ctx context.Context, sh *shard.Shard, trees []pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, weights []float64,
shardsToEvacuate map[string]*shard.Shard,
) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateTrees",
trace.WithAttributes(
attribute.Int("trees_count", len(trees)),
))
defer span.End()
for _, contTree := range trees {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
success, shardID, err := e.tryEvacuateTreeLocal(ctx, sh, contTree, prm, shards, weights, shardsToEvacuate)
if err != nil {
return err
}
if success {
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedLocal,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), zap.String("to_shard_id", shardID),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
continue
}
nodePK, err := e.evacuateTreeToOtherNode(ctx, sh, contTree, prm)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveTree,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("tree_id", contTree.TreeID),
zap.String("from_shard_id", sh.ID().String()), evacuationOperationLogField,
zap.Error(err), zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
e.log.Debug(logs.EngineShardsEvacuationTreeEvacuatedRemote,
zap.String("cid", contTree.CID.EncodeToString()), zap.String("treeID", contTree.TreeID),
zap.String("from_shardID", sh.ID().String()), zap.String("to_node", nodePK),
evacuationOperationLogField, zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
res.trEvacuated.Add(1)
}
return nil
}
func (e *StorageEngine) evacuateTreeToOtherNode(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID, prm EvacuateShardPrm) (string, error) {
if prm.TreeHandler == nil {
return "", fmt.Errorf("failed to evacuate tree '%s' for container %s from shard %s: local evacuation failed, but no remote evacuation available", tree.TreeID, tree.CID, sh.ID())
}
return prm.TreeHandler(ctx, tree.CID, tree.TreeID, sh)
}
func (e *StorageEngine) tryEvacuateTreeLocal(ctx context.Context, sh *shard.Shard, tree pilorama.ContainerIDTreeID,
prm EvacuateShardPrm, shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (bool, string, error) {
target, found, err := e.findShardToEvacuateTree(ctx, tree, shards, weights, shardsToEvacuate)
if err != nil {
return false, "", err
}
if !found {
return false, "", nil
}
const readBatchSize = 1000
source := make(chan *pilorama.Move, readBatchSize)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
var applyErr error
go func() {
defer wg.Done()
applyErr = target.TreeApplyStream(ctx, tree.CID, tree.TreeID, source)
if applyErr != nil {
cancel()
}
}()
var height uint64
for {
op, err := sh.TreeGetOpLog(ctx, tree.CID, tree.TreeID, height)
if err != nil {
cancel()
wg.Wait()
close(source) // close after cancel to ctx.Done() hits first
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", err
}
if op.Time == 0 { // completed get op log
close(source)
wg.Wait()
if applyErr == nil {
return true, target.ID().String(), nil
}
if prm.IgnoreErrors {
return false, "", nil
}
return false, "", applyErr
}
select {
case <-ctx.Done(): // apply stream failed or operation cancelled
wg.Wait()
if prm.IgnoreErrors {
return false, "", nil
}
if applyErr != nil {
return false, "", applyErr
}
return false, "", ctx.Err()
case source <- &op:
}
height = op.Time + 1
}
}
// findShardToEvacuateTree returns first shard according HRW or first shard with tree exists.
func (e *StorageEngine) findShardToEvacuateTree(ctx context.Context, tree pilorama.ContainerIDTreeID,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard,
) (pooledShard, bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.StringHash(tree.CID.EncodeToString()))
var result pooledShard
var found bool
for _, target := range shards {
select {
case <-ctx.Done():
return pooledShard{}, false, ctx.Err()
default:
}
if _, ok := shardsToEvacuate[target.ID().String()]; ok {
continue
}
if !target.PiloramaEnabled() || target.GetMode().ReadOnly() {
continue
}
if !found {
result = target
found = true
}
exists, err := target.TreeExists(ctx, tree.CID, tree.TreeID)
if err != nil {
continue
}
if exists {
return target, true, nil
}
}
return result, found, nil
}
func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) ([]pooledShard, []float64, error) {
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
@ -310,9 +596,17 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
if !sh.GetMode().ReadOnly() { if !sh.GetMode().ReadOnly() {
return nil, nil, ErrMustBeReadOnly return nil, nil, ErrMustBeReadOnly
} }
if prm.Scope.TreesOnly() && !sh.PiloramaEnabled() {
return nil, nil, fmt.Errorf("shard %s doesn't have pilorama enabled", sh.ID())
}
} }
if len(e.shards)-len(shardIDs) < 1 && !handlerDefined { if len(e.shards)-len(shardIDs) < 1 && prm.ObjectsHandler == nil && prm.Scope.WithObjects() {
return nil, nil, errMustHaveTwoShards
}
if len(e.shards)-len(shardIDs) < 1 && prm.TreeHandler == nil && prm.Scope.WithTrees() {
return nil, nil, errMustHaveTwoShards return nil, nil, errMustHaveTwoShards
} }
@ -357,8 +651,8 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm) getRes, err := sh.Get(ctx, getPrm)
if err != nil { if err != nil {
if prm.ignoreErrors { if prm.IgnoreErrors {
res.failed.Add(1) res.objFailed.Add(1)
continue continue
} }
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
@ -375,19 +669,19 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
continue continue
} }
if prm.handler == nil { if prm.ObjectsHandler == nil {
// Do not check ignoreErrors flag here because // Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless. // ignoring errors on put make this command kinda useless.
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
} }
err = prm.handler(ctx, addr, getRes.Object()) err = prm.ObjectsHandler(ctx, addr, getRes.Object())
if err != nil { if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err return err
} }
res.evacuated.Add(1) res.objEvacuated.Add(1)
} }
return nil return nil
} }
@ -408,7 +702,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
} }
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status { switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
case putToShardSuccess: case putToShardSuccess:
res.evacuated.Add(1) res.objEvacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard, e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()), zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()), zap.Stringer("to", shards[j].ID()),
@ -417,7 +711,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return true, nil return true, nil
case putToShardExists, putToShardRemoved: case putToShardExists, putToShardRemoved:
res.skipped.Add(1) res.objSkipped.Add(1)
return true, nil return true, nil
default: default:
continue continue

View file

@ -34,32 +34,53 @@ func (s *EvacuationState) ShardIDs() []string {
return s.shardIDs return s.shardIDs
} }
func (s *EvacuationState) Evacuated() uint64 { func (s *EvacuationState) ObjectsEvacuated() uint64 {
if s == nil { if s == nil {
return 0 return 0
} }
return s.result.Evacuated() return s.result.ObjectsEvacuated()
} }
func (s *EvacuationState) Total() uint64 { func (s *EvacuationState) ObjectsTotal() uint64 {
if s == nil { if s == nil {
return 0 return 0
} }
return s.result.Total() return s.result.ObjectsTotal()
} }
func (s *EvacuationState) Failed() uint64 { func (s *EvacuationState) ObjectsFailed() uint64 {
if s == nil { if s == nil {
return 0 return 0
} }
return s.result.Failed() return s.result.ObjectsFailed()
} }
func (s *EvacuationState) Skipped() uint64 { func (s *EvacuationState) ObjectsSkipped() uint64 {
if s == nil { if s == nil {
return 0 return 0
} }
return s.result.Skipped() return s.result.ObjectsSkipped()
}
func (s *EvacuationState) TreesEvacuated() uint64 {
if s == nil {
return 0
}
return s.result.TreesEvacuated()
}
func (s *EvacuationState) TreesTotal() uint64 {
if s == nil {
return 0
}
return s.result.TreesTotal()
}
func (s *EvacuationState) TreesFailed() uint64 {
if s == nil {
return 0
}
return s.result.TreesFailed()
} }
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState { func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {

View file

@ -14,9 +14,11 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -41,6 +43,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))), meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0o700), meta.WithPermissions(0o700),
meta.WithEpochState(epochState{})), meta.WithEpochState(epochState{})),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
pilorama.WithPerm(0o700),
),
} }
}) })
e, ids := te.engine, te.shardIDs e, ids := te.engine, te.shardIDs
@ -48,36 +54,32 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids)) objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
treeID := "version"
for _, sh := range ids { meta := []pilorama.KeyValue{
obj := testutil.GenerateObjectWithCID(cidtest.ID()) {Key: pilorama.AttributeVersion, Value: []byte("XXX")},
objects = append(objects, obj) {Key: pilorama.AttributeFilename, Value: []byte("file.txt")},
var putPrm shard.PutPrm
putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
} }
for i := 0; ; i++ { for _, sh := range ids {
objects = append(objects, testutil.GenerateObjectWithCID(cidtest.ID())) for i := 0; i < objPerShard; i++ {
contID := cidtest.ID()
obj := testutil.GenerateObjectWithCID(contID)
objects = append(objects, obj)
var putPrm PutPrm var putPrm shard.PutPrm
putPrm.WithObject(objects[len(objects)-1]) putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err)
err := e.Put(context.Background(), putPrm) _, err = e.shards[sh.String()].TreeAddByPath(context.Background(), pilorama.CIDDescriptor{CID: contID, Position: 0, Size: 1},
require.NoError(t, err) treeID, pilorama.AttributeFilename, []string{"path", "to", "the", "file"}, meta)
require.NoError(t, err)
res, err := e.shards[ids[len(ids)-1].String()].List(context.Background())
require.NoError(t, err)
if len(res.AddressList()) == objPerShard {
break
} }
} }
return e, ids, objects return e, ids, objects
} }
func TestEvacuateShard(t *testing.T) { func TestEvacuateShardObjects(t *testing.T) {
t.Parallel() t.Parallel()
const objPerShard = 3 const objPerShard = 3
@ -102,19 +104,20 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t) checkHasObjects(t)
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.WithShardIDList(ids[2:3]) prm.ShardID = ids[2:3]
prm.Scope = EvacuateScopeObjects
t.Run("must be read-only", func(t *testing.T) { t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly) require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, uint64(0), res.Evacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
}) })
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(objPerShard), res.Evacuated()) require.Equal(t, uint64(objPerShard), res.ObjectsEvacuated())
// We check that all objects are available both before and after shard removal. // We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense // First case is a real-world use-case. It ensures that an object can be put in presense
@ -125,7 +128,7 @@ func TestEvacuateShard(t *testing.T) {
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done. // Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm) res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(0), res.Evacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
checkHasObjects(t) checkHasObjects(t)
@ -137,7 +140,7 @@ func TestEvacuateShard(t *testing.T) {
checkHasObjects(t) checkHasObjects(t)
} }
func TestEvacuateNetwork(t *testing.T) { func TestEvacuateObjectsNetwork(t *testing.T) {
t.Parallel() t.Parallel()
errReplication := errors.New("handler error") errReplication := errors.New("handler error")
@ -173,17 +176,18 @@ func TestEvacuateNetwork(t *testing.T) {
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = ids[0:1] prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards) require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, uint64(0), res.Evacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
prm.handler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm) res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication) require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated()) require.Equal(t, uint64(2), res.ObjectsEvacuated())
}) })
t.Run("multiple shards, evacuate one", func(t *testing.T) { t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel() t.Parallel()
@ -196,19 +200,20 @@ func TestEvacuateNetwork(t *testing.T) {
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = ids[1:2] prm.ShardID = ids[1:2]
prm.handler = acceptOneOf(objects, 2) prm.ObjectsHandler = acceptOneOf(objects, 2)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication) require.ErrorIs(t, err, errReplication)
require.Equal(t, uint64(2), res.Evacuated()) require.Equal(t, uint64(2), res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3) prm.ObjectsHandler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(3), res.Evacuated()) require.Equal(t, uint64(3), res.ObjectsEvacuated())
}) })
}) })
t.Run("multiple shards, evacuate many", func(t *testing.T) { t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -233,19 +238,20 @@ func TestEvacuateNetwork(t *testing.T) {
} }
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = evacuateIDs prm.ShardID = evacuateIDs
prm.handler = acceptOneOf(objects, totalCount-1) prm.ObjectsHandler = acceptOneOf(objects, totalCount-1)
prm.Scope = EvacuateScopeObjects
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication) require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Evacuated()) require.Equal(t, totalCount-1, res.ObjectsEvacuated())
t.Run("no errors", func(t *testing.T) { t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount) prm.ObjectsHandler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, totalCount, res.Evacuated()) require.Equal(t, totalCount, res.ObjectsEvacuated())
}) })
}) })
} }
@ -261,8 +267,8 @@ func TestEvacuateCancellation(t *testing.T) {
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = ids[1:2] prm.ShardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -270,13 +276,14 @@ func TestEvacuateCancellation(t *testing.T) {
} }
return nil return nil
} }
prm.Scope = EvacuateScopeObjects
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
res, err := e.Evacuate(ctx, prm) res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled") require.ErrorContains(t, err, "context canceled")
require.Equal(t, uint64(0), res.Evacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
} }
func TestEvacuateSingleProcess(t *testing.T) { func TestEvacuateSingleProcess(t *testing.T) {
@ -292,8 +299,9 @@ func TestEvacuateSingleProcess(t *testing.T) {
running := make(chan interface{}) running := make(chan interface{})
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = ids[1:2] prm.ShardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select { select {
case <-running: case <-running:
default: default:
@ -307,21 +315,21 @@ func TestEvacuateSingleProcess(t *testing.T) {
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed") require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated()) require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil return nil
}) })
eg.Go(func() error { eg.Go(func() error {
<-running <-running
res, err := e.Evacuate(egCtx, prm) res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed") require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated()) require.Equal(t, uint64(0), res.ObjectsEvacuated())
close(blocker) close(blocker)
return nil return nil
}) })
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestEvacuateAsync(t *testing.T) { func TestEvacuateObjectsAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3) e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() { defer func() {
require.NoError(t, e.Close(context.Background())) require.NoError(t, e.Close(context.Background()))
@ -334,8 +342,9 @@ func TestEvacuateAsync(t *testing.T) {
running := make(chan interface{}) running := make(chan interface{})
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.shardID = ids[1:2] prm.ShardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error { prm.Scope = EvacuateScopeObjects
prm.ObjectsHandler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select { select {
case <-running: case <-running:
default: default:
@ -348,7 +357,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err := e.GetEvacuationState(context.Background()) st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed") require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state") require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count") require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at") require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at") require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids") require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
@ -358,7 +367,7 @@ func TestEvacuateAsync(t *testing.T) {
eg.Go(func() error { eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm) res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed") require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated()) require.Equal(t, uint64(3), res.ObjectsEvacuated())
return nil return nil
}) })
@ -367,7 +376,7 @@ func TestEvacuateAsync(t *testing.T) {
st, err = e.GetEvacuationState(context.Background()) st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed") require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state") require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count") require.Equal(t, uint64(0), st.ObjectsEvacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at") require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at") require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2) expectedShardIDs := make([]string, 0, 2)
@ -385,7 +394,7 @@ func TestEvacuateAsync(t *testing.T) {
}, 3*time.Second, 10*time.Millisecond, "invalid final state") }, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed") require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count") require.Equal(t, uint64(3), st.ObjectsEvacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at") require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at") require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids") require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
@ -393,3 +402,166 @@ func TestEvacuateAsync(t *testing.T) {
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestEvacuateTreesLocal(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
var prm EvacuateShardPrm
prm.ShardID = ids[0:1]
prm.Scope = EvacuateScopeTrees
expectedShardIDs := make([]string, 0, 1)
for _, id := range ids[0:1] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation result must be not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.Equal(t, uint64(3), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(3), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[0].String()])
require.NoError(t, err, "list source trees failed")
require.Len(t, sourceTrees, 3)
for _, tr := range sourceTrees {
exists, err := e.shards[ids[1].String()].TreeExists(context.Background(), tr.CID, tr.TreeID)
require.NoError(t, err, "failed to check tree existance")
require.True(t, exists, "tree doesn't exists on target shard")
var height uint64
var sourceOps []pilorama.Move
for {
op, err := e.shards[ids[0].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
sourceOps = append(sourceOps, op)
height = op.Time + 1
}
height = 0
var targetOps []pilorama.Move
for {
op, err := e.shards[ids[1].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
targetOps = append(targetOps, op)
height = op.Time + 1
}
require.Equal(t, sourceOps, targetOps)
}
}
func TestEvacuateTreesRemote(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
evacuatedTreeOps := make(map[string][]*pilorama.Move)
var prm EvacuateShardPrm
prm.ShardID = ids
prm.Scope = EvacuateScopeTrees
prm.TreeHandler = func(ctx context.Context, contID cid.ID, treeID string, f pilorama.Forest) (string, error) {
key := contID.String() + treeID
var height uint64
for {
op, err := f.TreeGetOpLog(ctx, contID, treeID, height)
require.NoError(t, err)
if op.Time == 0 {
return "", nil
}
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
height = op.Time + 1
}
}
expectedShardIDs := make([]string, 0, len(ids))
for _, id := range ids {
expectedShardIDs = append(expectedShardIDs, id.String())
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.TreesEvacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
res, err := e.Evacuate(context.Background(), prm)
require.NotNil(t, res, "sync evacuation must return not nil")
require.NoError(t, err, "evacuation failed")
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get evacuation state failed")
require.Equal(t, EvacuateProcessStateCompleted, st.ProcessingStatus())
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(6), st.TreesTotal(), "invalid trees total count")
require.Equal(t, uint64(6), st.TreesEvacuated(), "invalid trees evacuated count")
require.Equal(t, uint64(0), st.TreesFailed(), "invalid trees failed count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
expectedTreeOps := make(map[string][]*pilorama.Move)
for i := 0; i < len(e.shards); i++ {
sourceTrees, err := pilorama.TreeListAll(context.Background(), e.shards[ids[i].String()])
require.NoError(t, err, "list source trees failed")
require.Len(t, sourceTrees, 3)
for _, tr := range sourceTrees {
key := tr.CID.String() + tr.TreeID
var height uint64
for {
op, err := e.shards[ids[i].String()].TreeGetOpLog(context.Background(), tr.CID, tr.TreeID, height)
require.NoError(t, err)
if op.Time == 0 {
break
}
expectedTreeOps[key] = append(expectedTreeOps[key], &op)
height = op.Time + 1
}
}
}
require.Equal(t, expectedTreeOps, evacuatedTreeOps)
}

View file

@ -549,6 +549,58 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyStream", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyStream",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return ErrDegradedMode
} else if t.mode.ReadOnly() {
return ErrReadOnlyMode
}
fullID := bucketName(cnr, treeID)
err := metaerr.Wrap(t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, fullID)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
var lm Move
if e := t.applyOperation(bLog, bTree, []*Move{m}, &lm); e != nil {
return e
}
}
}
}))
success = err == nil
return err
}
func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) { func (t *boltForest) addBatch(cnr cidSDK.ID, treeID string, m *Move, ch chan error) {
t.mtx.Lock() t.mtx.Lock()
for i := 0; i < len(t.batches); i++ { for i := 0; i < len(t.batches); i++ {
@ -1134,6 +1186,68 @@ func (t *boltForest) TreeDrop(ctx context.Context, cid cidSDK.ID, treeID string)
return err return err
} }
// TreeListTrees implements ForestStorage.
func (t *boltForest) TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeListTrees", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeListTrees")
defer span.End()
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
var res TreeListTreesResult
err := metaerr.Wrap(t.db.View(func(tx *bbolt.Tx) error {
c := tx.Cursor()
checkNextPageToken := true
for k, _ := c.Seek(prm.NextPageToken); k != nil; k, _ = c.Next() {
if bytes.Equal(k, dataBucket) || bytes.Equal(k, logBucket) {
continue
}
if checkNextPageToken && bytes.Equal(k, prm.NextPageToken) {
checkNextPageToken = false
continue
}
var contID cidSDK.ID
if err := contID.Decode(k[:32]); err != nil {
return fmt.Errorf("failed to decode containerID: %w", err)
}
res.Items = append(res.Items, ContainerIDTreeID{
CID: contID,
TreeID: string(k[32:]),
})
if len(res.Items) == batchSize {
res.NextPageToken = make([]byte, len(k))
copy(res.NextPageToken, k)
break
}
}
return nil
}))
success = err == nil
if err != nil {
return nil, err
}
return &res, nil
}
func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) { func (t *boltForest) getPathPrefix(bTree *bbolt.Bucket, attr string, path []string) (int, Node, error) {
c := bTree.Cursor() c := bTree.Cursor()

View file

@ -2,6 +2,7 @@ package pilorama
import ( import (
"context" "context"
"fmt"
"sort" "sort"
"strings" "strings"
@ -260,3 +261,77 @@ func (f *memoryForest) TreeLastSyncHeight(_ context.Context, cid cid.ID, treeID
} }
return t.syncHeight, nil return t.syncHeight, nil
} }
// TreeListTrees implements Forest.
func (f *memoryForest) TreeListTrees(_ context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error) {
batchSize := prm.BatchSize
if batchSize <= 0 {
batchSize = treeListTreesBatchSizeDefault
}
tmpSlice := make([]string, 0, len(f.treeMap))
for k := range f.treeMap {
tmpSlice = append(tmpSlice, k)
}
sort.Strings(tmpSlice)
var idx int
if len(prm.NextPageToken) > 0 {
last := string(prm.NextPageToken)
idx, _ = sort.Find(len(tmpSlice), func(i int) int {
return -1 * strings.Compare(tmpSlice[i], last)
})
if idx == len(tmpSlice) {
return &TreeListTreesResult{}, nil
}
if tmpSlice[idx] == last {
idx++
}
}
var result TreeListTreesResult
for idx < len(tmpSlice) {
cidAndTree := strings.Split(tmpSlice[idx], "/")
if len(cidAndTree) != 2 {
return nil, fmt.Errorf("invalid format: key must be cid and treeID")
}
var contID cid.ID
if err := contID.DecodeString(cidAndTree[0]); err != nil {
return nil, fmt.Errorf("invalid format: %w", err)
}
result.Items = append(result.Items, ContainerIDTreeID{
CID: contID,
TreeID: cidAndTree[1],
})
if len(result.Items) == batchSize {
result.NextPageToken = []byte(tmpSlice[idx])
break
}
idx++
}
return &result, nil
}
// TreeApplyStream implements ForestStorage.
func (f *memoryForest) TreeApplyStream(ctx context.Context, cnr cid.ID, treeID string, source <-chan *Move) error {
fullID := cnr.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
s = newMemoryTree()
f.treeMap[fullID] = s
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case m, ok := <-source:
if !ok {
return nil
}
if e := s.Apply(m); e != nil {
return e
}
}
}
}

View file

@ -1189,3 +1189,59 @@ func testTreeLastSyncHeight(t *testing.T, f ForestStorage) {
require.ErrorIs(t, err, ErrTreeNotFound) require.ErrorIs(t, err, ErrTreeNotFound)
}) })
} }
func TestForest_ListTrees(t *testing.T) {
for i := range providers {
i := i
t.Run(providers[i].name, func(t *testing.T) {
testTreeListTrees(t, providers[i].construct)
})
}
}
func testTreeListTrees(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage) {
batchSize := 10
t.Run("empty", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 0)
})
t.Run("count lower than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize-1)
})
t.Run("count equals batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize)
})
t.Run("count greater than batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, batchSize+1)
})
t.Run("count equals multiplied batch size", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize)
})
t.Run("count equals multiplied batch size with addition", func(t *testing.T) {
testTreeListTreesCount(t, constructor, batchSize, 3*batchSize+batchSize/2)
})
}
func testTreeListTreesCount(t *testing.T, constructor func(t testing.TB, _ ...Option) ForestStorage, batchSize, count int) {
f := constructor(t)
var expected []ContainerIDTreeID
treeIDs := []string{"version", "system", "s", "avada kedavra"}
for i := 0; i < count; i++ {
cid := cidtest.ID()
treeID := treeIDs[i%len(treeIDs)]
expected = append(expected, ContainerIDTreeID{
CID: cid,
TreeID: treeID,
})
ops := prepareRandomTree(5, 5)
for _, op := range ops {
require.NoError(t, f.TreeApply(context.Background(), cid, treeID, &op, false))
}
}
actual, err := treeListAll(context.Background(), f, batchSize)
require.NoError(t, err)
require.ElementsMatch(t, expected, actual)
}

View file

@ -63,6 +63,10 @@ type ForestStorage interface {
SetMode(m mode.Mode) error SetMode(m mode.Mode) error
SetParentID(id string) SetParentID(id string)
Forest Forest
// TreeListTrees returns all pairs "containerID:treeID".
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error
} }
const ( const (
@ -85,3 +89,68 @@ var ErrInvalidCIDDescriptor = logicerr.New("cid descriptor is invalid")
func (d CIDDescriptor) checkValid() bool { func (d CIDDescriptor) checkValid() bool {
return 0 <= d.Position && d.Position < d.Size return 0 <= d.Position && d.Position < d.Size
} }
var treeListTreesBatchSizeDefault = 1000
type ContainerIDTreeID struct {
CID cidSDK.ID
TreeID string
}
type TreeListTreesPrm struct {
NextPageToken []byte
// BatchSize is batch size to list trees. If not lower or equals zero, than treeListTreesBatchSizeDefault is used.
BatchSize int
}
type TreeListTreesResult struct {
NextPageToken []byte
Items []ContainerIDTreeID
}
type treeList interface {
TreeListTrees(ctx context.Context, prm TreeListTreesPrm) (*TreeListTreesResult, error)
}
func TreeListAll(ctx context.Context, f treeList) ([]ContainerIDTreeID, error) {
return treeListAll(ctx, f, treeListTreesBatchSizeDefault)
}
func treeListAll(ctx context.Context, f treeList, batchSize int) ([]ContainerIDTreeID, error) {
var prm TreeListTreesPrm
prm.BatchSize = batchSize
var result []ContainerIDTreeID
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return nil, err
}
prm.NextPageToken = res.NextPageToken
result = append(result, res.Items...)
}
return result, nil
}
func TreeCountAll(ctx context.Context, f treeList) (uint64, error) {
var prm TreeListTreesPrm
var result uint64
first := true
for len(prm.NextPageToken) > 0 || first {
first = false
res, err := f.TreeListTrees(ctx, prm)
if err != nil {
return 0, err
}
prm.NextPageToken = res.NextPageToken
result += uint64(len(res.Items))
}
return result, nil
}

View file

@ -353,3 +353,53 @@ func (s *Shard) TreeLastSyncHeight(ctx context.Context, cid cidSDK.ID, treeID st
} }
return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID) return s.pilorama.TreeLastSyncHeight(ctx, cid, treeID)
} }
func (s *Shard) TreeListTrees(ctx context.Context, prm pilorama.TreeListTreesPrm) (*pilorama.TreeListTreesResult, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeListTrees",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
),
)
defer span.End()
if s.pilorama == nil {
return nil, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, ErrDegradedMode
}
return s.pilorama.TreeListTrees(ctx, prm)
}
func (s *Shard) PiloramaEnabled() bool {
return s.pilorama != nil
}
func (s *Shard) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyStream",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID)),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyStream(ctx, cnr, treeID, source)
}

View file

@ -47,15 +47,18 @@ func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuation
} }
return &control.GetShardEvacuationStatusResponse{ return &control.GetShardEvacuationStatusResponse{
Body: &control.GetShardEvacuationStatusResponse_Body{ Body: &control.GetShardEvacuationStatusResponse_Body{
Shard_ID: shardIDs, Shard_ID: shardIDs,
Evacuated: state.Evacuated(), EvacuatedObjects: state.ObjectsEvacuated(),
Total: state.Total(), TotalObjects: state.ObjectsTotal(),
Failed: state.Failed(), FailedObjects: state.ObjectsFailed(),
Status: evacStatus, Status: evacStatus,
StartedAt: startedAt, StartedAt: startedAt,
Duration: duration, Duration: duration,
ErrorMessage: state.ErrorMessage(), ErrorMessage: state.ErrorMessage(),
Skipped: state.Skipped(), SkippedObjects: state.ObjectsSkipped(),
TotalTrees: state.TreesTotal(),
EvacuatedTrees: state.TreesEvacuated(),
FailedTrees: state.TreesFailed(),
}, },
}, nil }, nil
} }

View file

@ -4,13 +4,17 @@ import (
"bytes" "bytes"
"context" "context"
"crypto/sha256" "crypto/sha256"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -25,10 +29,12 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
return nil, status.Error(codes.PermissionDenied, err.Error()) return nil, status.Error(codes.PermissionDenied, err.Error())
} }
var prm engine.EvacuateShardPrm prm := engine.EvacuateShardPrm{
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) IgnoreErrors: req.GetBody().GetIgnoreErrors(),
prm.WithFaultHandler(s.replicate) ObjectsHandler: s.replicateObject,
Scope: engine.EvacuateScopeObjects,
}
res, err := s.s.Evacuate(ctx, prm) res, err := s.s.Evacuate(ctx, prm)
if err != nil { if err != nil {
@ -37,7 +43,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
resp := &control.EvacuateShardResponse{ resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{ Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Evacuated()), Count: uint32(res.ObjectsEvacuated()),
}, },
} }
@ -48,7 +54,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
return resp, nil return resp, nil
} }
func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error { func (s *Server) replicateObject(ctx context.Context, addr oid.Address, obj *objectSDK.Object) error {
cid, ok := obj.ContainerID() cid, ok := obj.ContainerID()
if !ok { if !ok {
// Return nil to prevent situations where a shard can't be evacuated // Return nil to prevent situations where a shard can't be evacuated
@ -56,33 +62,11 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
return nil return nil
} }
nm, err := s.netMapSrc.GetNetMap(0) nodes, err := s.getContainerNodes(cid)
if err != nil { if err != nil {
return err return err
} }
c, err := s.cnrSrc.Get(cid)
if err != nil {
return err
}
binCnr := make([]byte, sha256.Size)
cid.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return fmt.Errorf("can't build a list of container nodes")
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
var res replicatorResult var res replicatorResult
task := replicator.Task{ task := replicator.Task{
NumCopies: 1, NumCopies: 1,
@ -98,6 +82,95 @@ func (s *Server) replicate(ctx context.Context, addr oid.Address, obj *objectSDK
return nil return nil
} }
func (s *Server) replicateTree(ctx context.Context, contID cid.ID, treeID string, forest pilorama.Forest) (string, error) {
nodes, err := s.getContainerNodes(contID)
if err != nil {
return "", err
}
if len(nodes) == 0 {
return "", fmt.Errorf("no remote nodes available to replicate tree '%s' of container %s", treeID, contID)
}
for _, node := range nodes {
err = s.replicateTreeToNode(ctx, forest, contID, treeID, node)
if err == nil {
return hex.EncodeToString(node.PublicKey()), nil
}
}
return "", err
}
func (s *Server) replicateTreeToNode(ctx context.Context, forest pilorama.Forest, contID cid.ID, treeID string, node netmap.NodeInfo) error {
rawCID := make([]byte, sha256.Size)
contID.Encode(rawCID)
var height uint64
for {
op, err := forest.TreeGetOpLog(ctx, contID, treeID, height)
if err != nil {
return err
}
if op.Time == 0 {
return nil
}
req := &tree.ApplyRequest{
Body: &tree.ApplyRequest_Body{
ContainerId: rawCID,
TreeId: treeID,
Operation: &tree.LogMove{
ParentId: op.Parent,
Meta: op.Meta.Bytes(),
ChildId: op.Child,
},
},
}
err = tree.SignMessage(req, s.key)
if err != nil {
return fmt.Errorf("can't sign apply request: %w", err)
}
err = s.treeService.ReplicateTreeOp(ctx, node, req)
if err != nil {
return err
}
height = op.Time + 1
}
}
func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nm, err := s.netMapSrc.GetNetMap(0)
if err != nil {
return nil, err
}
c, err := s.cnrSrc.Get(contID)
if err != nil {
return nil, err
}
binCnr := make([]byte, sha256.Size)
contID.Encode(binCnr)
ns, err := nm.ContainerNodes(c.Value.PlacementPolicy(), binCnr)
if err != nil {
return nil, fmt.Errorf("can't build a list of container nodes")
}
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]
}
}
return nodes, nil
}
type replicatorResult struct { type replicatorResult struct {
count int count int
} }

View file

@ -17,11 +17,18 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
return nil, status.Error(codes.PermissionDenied, err.Error()) return nil, status.Error(codes.PermissionDenied, err.Error())
} }
var prm engine.EvacuateShardPrm if req.GetBody().GetScope() == uint32(control.StartShardEvacuationRequest_Body_NONE) {
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) return nil, status.Error(codes.InvalidArgument, "no evacuation scope")
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) }
prm.WithFaultHandler(s.replicate)
prm.WithAsync(true) prm := engine.EvacuateShardPrm{
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
ObjectsHandler: s.replicateObject,
TreeHandler: s.replicateTree,
Async: true,
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
}
_, err = s.s.Evacuate(ctx, prm) _, err = s.s.Evacuate(ctx, prm)
if err != nil { if err != nil {

View file

@ -4,14 +4,17 @@ import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// TreeService represents a tree service instance. // TreeService represents a tree service instance.
type TreeService interface { type TreeService interface {
Synchronize(ctx context.Context, cnr cid.ID, treeID string) error SynchronizeTree(ctx context.Context, cnr cid.ID, treeID string) error
ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *tree.ApplyRequest) error
} }
func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) { func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTreeRequest) (*control.SynchronizeTreeResponse, error) {
@ -31,7 +34,7 @@ func (s *Server) SynchronizeTree(ctx context.Context, req *control.SynchronizeTr
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
err = s.treeService.Synchronize(ctx, cnr, b.GetTreeId()) err = s.treeService.SynchronizeTree(ctx, cnr, b.GetTreeId())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

File diff suppressed because it is too large Load diff

View file

@ -336,10 +336,18 @@ message DoctorResponse {
message StartShardEvacuationRequest { message StartShardEvacuationRequest {
// Request body structure. // Request body structure.
message Body { message Body {
enum Scope {
NONE = 0;
OBJECTS = 1;
TREES = 2;
}
// IDs of the shards. // IDs of the shards.
repeated bytes shard_ID = 1; repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored. // Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2; bool ignore_errors = 2;
// Evacuation scope.
uint32 scope = 3;
} }
Body body = 1; Body body = 1;
@ -386,11 +394,11 @@ message GetShardEvacuationStatusResponse {
} }
// Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion. // Total objects to evacuate count. The value is approximate, so evacuated + failed + skipped == total is not guaranteed after completion.
uint64 total = 1; uint64 total_objects = 1;
// Evacuated objects count. // Evacuated objects count.
uint64 evacuated = 2; uint64 evacuated_objects = 2;
// Failed objects count. // Failed objects count.
uint64 failed = 3; uint64 failed_objects = 3;
// Shard IDs. // Shard IDs.
repeated bytes shard_ID = 4; repeated bytes shard_ID = 4;
@ -404,7 +412,14 @@ message GetShardEvacuationStatusResponse {
string error_message = 8; string error_message = 8;
// Skipped objects count. // Skipped objects count.
uint64 skipped = 9; uint64 skipped_objects = 9;
// Total trees to evacuate count.
uint64 total_trees = 10;
// Evacuated trees count.
uint64 evacuated_trees = 11;
// Failed trees count.
uint64 failed_trees = 12;
} }
Body body = 1; Body body = 1;

View file

@ -1511,6 +1511,7 @@ func (x *StartShardEvacuationRequest_Body) StableSize() (size int) {
} }
size += proto.RepeatedBytesSize(1, x.Shard_ID) size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.IgnoreErrors) size += proto.BoolSize(2, x.IgnoreErrors)
size += proto.UInt32Size(3, x.Scope)
return size return size
} }
@ -1532,6 +1533,7 @@ func (x *StartShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
var offset int var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID) offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors) offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
offset += proto.UInt32Marshal(3, buf[offset:], x.Scope)
return buf return buf
} }
@ -1813,15 +1815,18 @@ func (x *GetShardEvacuationStatusResponse_Body) StableSize() (size int) {
if x == nil { if x == nil {
return 0 return 0
} }
size += proto.UInt64Size(1, x.Total) size += proto.UInt64Size(1, x.TotalObjects)
size += proto.UInt64Size(2, x.Evacuated) size += proto.UInt64Size(2, x.EvacuatedObjects)
size += proto.UInt64Size(3, x.Failed) size += proto.UInt64Size(3, x.FailedObjects)
size += proto.RepeatedBytesSize(4, x.Shard_ID) size += proto.RepeatedBytesSize(4, x.Shard_ID)
size += proto.EnumSize(5, int32(x.Status)) size += proto.EnumSize(5, int32(x.Status))
size += proto.NestedStructureSize(6, x.Duration) size += proto.NestedStructureSize(6, x.Duration)
size += proto.NestedStructureSize(7, x.StartedAt) size += proto.NestedStructureSize(7, x.StartedAt)
size += proto.StringSize(8, x.ErrorMessage) size += proto.StringSize(8, x.ErrorMessage)
size += proto.UInt64Size(9, x.Skipped) size += proto.UInt64Size(9, x.SkippedObjects)
size += proto.UInt64Size(10, x.TotalTrees)
size += proto.UInt64Size(11, x.EvacuatedTrees)
size += proto.UInt64Size(12, x.FailedTrees)
return size return size
} }
@ -1841,15 +1846,18 @@ func (x *GetShardEvacuationStatusResponse_Body) StableMarshal(buf []byte) []byte
buf = make([]byte, x.StableSize()) buf = make([]byte, x.StableSize())
} }
var offset int var offset int
offset += proto.UInt64Marshal(1, buf[offset:], x.Total) offset += proto.UInt64Marshal(1, buf[offset:], x.TotalObjects)
offset += proto.UInt64Marshal(2, buf[offset:], x.Evacuated) offset += proto.UInt64Marshal(2, buf[offset:], x.EvacuatedObjects)
offset += proto.UInt64Marshal(3, buf[offset:], x.Failed) offset += proto.UInt64Marshal(3, buf[offset:], x.FailedObjects)
offset += proto.RepeatedBytesMarshal(4, buf[offset:], x.Shard_ID) offset += proto.RepeatedBytesMarshal(4, buf[offset:], x.Shard_ID)
offset += proto.EnumMarshal(5, buf[offset:], int32(x.Status)) offset += proto.EnumMarshal(5, buf[offset:], int32(x.Status))
offset += proto.NestedStructureMarshal(6, buf[offset:], x.Duration) offset += proto.NestedStructureMarshal(6, buf[offset:], x.Duration)
offset += proto.NestedStructureMarshal(7, buf[offset:], x.StartedAt) offset += proto.NestedStructureMarshal(7, buf[offset:], x.StartedAt)
offset += proto.StringMarshal(8, buf[offset:], x.ErrorMessage) offset += proto.StringMarshal(8, buf[offset:], x.ErrorMessage)
offset += proto.UInt64Marshal(9, buf[offset:], x.Skipped) offset += proto.UInt64Marshal(9, buf[offset:], x.SkippedObjects)
offset += proto.UInt64Marshal(10, buf[offset:], x.TotalTrees)
offset += proto.UInt64Marshal(11, buf[offset:], x.EvacuatedTrees)
offset += proto.UInt64Marshal(12, buf[offset:], x.FailedTrees)
return buf return buf
} }

View file

@ -71,61 +71,67 @@ func (s *Service) replicationWorker(ctx context.Context) {
case <-s.closeCh: case <-s.closeCh:
return return
case task := <-s.replicationTasks: case task := <-s.replicationTasks:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask", _ = s.ReplicateTreeOp(ctx, task.n, task.req)
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
),
)
start := time.Now()
var lastErr error
var lastAddr string
task.n.IterateNetworkEndpoints(func(addr string) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(task.n.PublicKey())),
attribute.String("address", addr),
),
)
defer span.End()
lastAddr = addr
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err)
return false
}
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, task.req)
cancel()
return lastErr == nil
})
if lastErr != nil {
if errors.Is(lastErr, errRecentlyFailed) {
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(task.n.PublicKey())),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
} else {
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
}
span.End()
} }
} }
} }
func (s *Service) ReplicateTreeOp(ctx context.Context, n netmapSDK.NodeInfo, req *ApplyRequest) error {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTask",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
),
)
defer span.End()
start := time.Now()
var lastErr error
var lastAddr string
n.IterateNetworkEndpoints(func(addr string) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.HandleReplicationTaskOnEndpoint",
trace.WithAttributes(
attribute.String("public_key", hex.EncodeToString(n.PublicKey())),
attribute.String("address", addr),
),
)
defer span.End()
lastAddr = addr
c, err := s.cache.get(ctx, addr)
if err != nil {
lastErr = fmt.Errorf("can't create client: %w", err)
return false
}
ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout)
_, lastErr = c.Apply(ctx, req)
cancel()
return lastErr == nil
})
if lastErr != nil {
if errors.Is(lastErr, errRecentlyFailed) {
s.log.Debug(logs.TreeDoNotSendUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} else {
s.log.Warn(logs.TreeFailedToSentUpdateToTheNode,
zap.String("last_error", lastErr.Error()),
zap.String("address", lastAddr),
zap.String("key", hex.EncodeToString(n.PublicKey())),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
s.metrics.AddReplicateTaskDuration(time.Since(start), false)
return lastErr
}
s.metrics.AddReplicateTaskDuration(time.Since(start), true)
return nil
}
func (s *Service) replicateLoop(ctx context.Context) { func (s *Service) replicateLoop(ctx context.Context) {
for i := 0; i < s.replicatorWorkerCount; i++ { for i := 0; i < s.replicatorWorkerCount; i++ {
go s.replicationWorker(ctx) go s.replicationWorker(ctx)