Async evacuate #329
19 changed files with 1229 additions and 44 deletions
|
@ -11,10 +11,11 @@ import (
|
|||
const ignoreErrorsFlag = "no-errors"
|
||||
|
||||
var evacuateShardCmd = &cobra.Command{
|
||||
Use: "evacuate",
|
||||
Short: "Evacuate objects from shard",
|
||||
Long: "Evacuate objects from shard to other shards",
|
||||
Run: evacuateShard,
|
||||
Use: "evacuate",
|
||||
Short: "Evacuate objects from shard",
|
||||
Long: "Evacuate objects from shard to other shards",
|
||||
Run: evacuateShard,
|
||||
Deprecated: "use frostfs-cli control shards evacuation start",
|
||||
}
|
||||
|
||||
func evacuateShard(cmd *cobra.Command, _ []string) {
|
||||
|
|
296
cmd/frostfs-cli/modules/control/evacuation.go
Normal file
296
cmd/frostfs-cli/modules/control/evacuation.go
Normal file
|
@ -0,0 +1,296 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
awaitFlag = "await"
|
||||
noProgressFlag = "no-progress"
|
||||
)
|
||||
|
||||
var evacuationShardCmd = &cobra.Command{
|
||||
Use: "evacuation",
|
||||
Short: "Objects evacuation from shard",
|
||||
Long: "Objects evacuation from shard to other shards",
|
||||
}
|
||||
|
||||
var startEvacuationShardCmd = &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "Start evacuate objects from shard",
|
||||
Long: "Start evacuate objects from shard to other shards",
|
||||
Run: startEvacuateShard,
|
||||
}
|
||||
|
||||
var getEvacuationShardStatusCmd = &cobra.Command{
|
||||
Use: "status",
|
||||
Short: "Get evacuate objects from shard status",
|
||||
Long: "Get evacuate objects from shard to other shards status",
|
||||
Run: getEvacuateShardStatus,
|
||||
}
|
||||
|
||||
var stopEvacuationShardCmd = &cobra.Command{
|
||||
Use: "stop",
|
||||
Short: "Stop running evacuate process",
|
||||
Long: "Stop running evacuate process from shard to other shards",
|
||||
Run: stopEvacuateShardStatus,
|
||||
}
|
||||
|
||||
func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
|
||||
req := &control.StartShardEvacuationRequest{
|
||||
Body: &control.StartShardEvacuationRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.StartShardEvacuationResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.StartEvacuateShard(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
cmd.Println("Shard evacuation has been successfully started.")
|
||||
|
||||
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
|
||||
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
||||
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
|
||||
}
|
||||
}
|
||||
|
||||
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
req := &control.GetShardEvacuationStatusRequest{
|
||||
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.GetShardEvacuationStatusResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
printStatus(cmd, resp)
|
||||
}
|
||||
|
||||
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
req := &control.StopShardEvacuationRequest{
|
||||
Body: &control.StopShardEvacuationRequest_Body{},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.StopShardEvacuationResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.StopEvacuateShard(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
waitEvacuateCompletion(cmd, pk, cli, false, false)
|
||||
|
||||
cmd.Println("Evacuation stopped.")
|
||||
}
|
||||
|
||||
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
|
||||
const statusPollingInterval = 1 * time.Second
|
||||
const reportIntervalSeconds = 5
|
||||
var resp *control.GetShardEvacuationStatusResponse
|
||||
reportResponse := atomic.NewPointer(resp)
|
||||
pollingCompleted := make(chan struct{})
|
||||
progressReportCompleted := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer close(progressReportCompleted)
|
||||
if !printProgress {
|
||||
return
|
||||
}
|
||||
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
|
||||
for {
|
||||
select {
|
||||
case <-pollingCompleted:
|
||||
return
|
||||
case <-time.After(reportIntervalSeconds * time.Second):
|
||||
r := reportResponse.Load()
|
||||
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
|
||||
continue
|
||||
}
|
||||
printStatus(cmd, r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
req := &control.GetShardEvacuationStatusRequest{
|
||||
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||
}
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||
return err
|
||||
})
|
||||
|
||||
reportResponse.Store(resp)
|
||||
|
||||
if err != nil {
|
||||
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
|
||||
return
|
||||
}
|
||||
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(statusPollingInterval)
|
||||
}
|
||||
close(pollingCompleted)
|
||||
<-progressReportCompleted
|
||||
if printCompleted {
|
||||
printCompletedStatusMessage(cmd, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||
cmd.Println("Shard evacuation has been completed.")
|
||||
sb := &strings.Builder{}
|
||||
appendShardIDs(sb, resp)
|
||||
appendCounts(sb, resp)
|
||||
appendError(sb, resp)
|
||||
appendStartedAt(sb, resp)
|
||||
appendDuration(sb, resp)
|
||||
cmd.Println(sb.String())
|
||||
}
|
||||
|
||||
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
|
||||
cmd.Println("There is no running or completed evacuation.")
|
||||
return
|
||||
}
|
||||
sb := &strings.Builder{}
|
||||
appendShardIDs(sb, resp)
|
||||
appendStatus(sb, resp)
|
||||
appendCounts(sb, resp)
|
||||
appendError(sb, resp)
|
||||
appendStartedAt(sb, resp)
|
||||
appendDuration(sb, resp)
|
||||
cmd.Println(sb.String())
|
||||
}
|
||||
|
||||
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetDuration() != nil {
|
||||
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
|
||||
hour := int(duration.Seconds() / 3600)
|
||||
minute := int(duration.Seconds()/60) % 60
|
||||
second := int(duration.Seconds()) % 60
|
||||
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
|
||||
}
|
||||
}
|
||||
|
||||
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetStartedAt() != nil {
|
||||
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
|
||||
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
|
||||
}
|
||||
}
|
||||
|
||||
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if len(resp.Body.GetErrorMessage()) > 0 {
|
||||
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
|
||||
}
|
||||
}
|
||||
|
||||
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
var status string
|
||||
switch resp.GetBody().GetStatus() {
|
||||
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
|
||||
status = "completed"
|
||||
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
|
||||
status = "running"
|
||||
default:
|
||||
status = "undefined"
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
|
||||
}
|
||||
|
||||
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
sb.WriteString("Shard IDs: ")
|
||||
for idx, shardID := range resp.GetBody().GetShard_ID() {
|
||||
shardIDStr := shard.NewIDFromBytes(shardID).String()
|
||||
if idx > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString(shardIDStr)
|
||||
if idx == len(resp.GetBody().GetShard_ID())-1 {
|
||||
sb.WriteString(".")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
||||
resp.GetBody().GetEvacuated(),
|
||||
resp.Body.GetTotal(),
|
||||
resp.Body.GetFailed()))
|
||||
}
|
||||
|
||||
func initControlEvacuationShardCmd() {
|
||||
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
|
||||
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
|
||||
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
|
||||
|
||||
initControlStartEvacuationShardCmd()
|
||||
initControlFlags(getEvacuationShardStatusCmd)
|
||||
initControlFlags(stopEvacuationShardCmd)
|
||||
}
|
||||
|
||||
func initControlStartEvacuationShardCmd() {
|
||||
initControlFlags(startEvacuationShardCmd)
|
||||
|
||||
flags := startEvacuationShardCmd.Flags()
|
||||
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
flags.Bool(shardAllFlag, false, "Process all shards")
|
||||
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||
|
||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
|
@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{
|
|||
func initControlShardsCmd() {
|
||||
shardsCmd.AddCommand(listShardsCmd)
|
||||
shardsCmd.AddCommand(setShardModeCmd)
|
||||
shardsCmd.AddCommand(evacuateShardCmd)
|
||||
shardsCmd.AddCommand(evacuationShardCmd)
|
||||
shardsCmd.AddCommand(flushCacheCmd)
|
||||
shardsCmd.AddCommand(doctorCmd)
|
||||
|
||||
initControlShardsListCmd()
|
||||
initControlSetShardModeCmd()
|
||||
initControlEvacuateShardCmd()
|
||||
initControlEvacuationShardCmd()
|
||||
initControlFlushCacheCmd()
|
||||
initControlDoctorCmd()
|
||||
}
|
||||
|
|
92
docs/evacuation.md
Normal file
92
docs/evacuation.md
Normal file
|
@ -0,0 +1,92 @@
|
|||
# Shard data evacuation
|
||||
|
||||
## Overview
|
||||
|
||||
Evacuation is the process of transferring data from one shard to another. Evacuation is used in case of problems with the shard in order to save data.
|
||||
|
||||
To start the evacuation, it is necessary that the shard is in read-only mode (read more [here](./shard-modes.md)).
|
||||
|
||||
First of all, by the evacuation the data is transferred to other shards of the same node; if it is not possible, then the data is transferred to other nodes.
|
||||
|
||||
Only one running evacuation process is allowed on the node at a time.
|
||||
|
||||
`frostfs-cli` utility is used to manage evacuation.
|
||||
|
||||
## Commands
|
||||
|
||||
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
|
||||
|
||||
`frostfs-cli control shards evacuation stop` stops running evacuation process.
|
||||
|
||||
`frostfs-cli control shards evacuation status` prints evacuation process status.
|
||||
|
||||
See commands `--help` output for detailed description.
|
||||
|
||||
## Examples
|
||||
|
||||
### Set shard mode to read only
|
||||
```bash
|
||||
frostfs-cli control shards set-mode --mode read-only --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
|
||||
Enter password >
|
||||
Shard mode update request successfully sent.
|
||||
```
|
||||
|
||||
### Start evacuation and get status
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:05.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:13.
|
||||
```
|
||||
|
||||
### Stop running evacuation process
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03.
|
||||
|
||||
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Evacuation stopped.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
|
||||
```
|
||||
|
||||
### Start evacuation and await it completes
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
Progress will be reported every 5 seconds.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
|
||||
Shard evacuation has been completed.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
|
||||
```
|
||||
|
||||
### Start evacuation and await it completes without progress notifications
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await --no-progress
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
Shard evacuation has been completed.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
|
||||
```
|
|
@ -484,5 +484,9 @@ const (
|
|||
ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed"
|
||||
ShardGCRemoveGarbageStarted = "garbage remove started"
|
||||
ShardGCRemoveGarbageCompleted = "garbage remove completed"
|
||||
EngineShardsEvacuationFailedToCount = "failed to get total objects count to evacuate"
|
||||
EngineShardsEvacuationFailedToListObjects = "failed to list objects to evacuate"
|
||||
EngineShardsEvacuationFailedToReadObject = "failed to read object to evacuate"
|
||||
EngineShardsEvacuationFailedToMoveObject = "failed to evacuate object to other node"
|
||||
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
|
||||
)
|
||||
|
|
|
@ -35,6 +35,7 @@ type StorageEngine struct {
|
|||
|
||||
err error
|
||||
}
|
||||
evacuateLimiter *evacuationLimiter
|
||||
}
|
||||
|
||||
type shardWrapper struct {
|
||||
|
@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine {
|
|||
shardPools: make(map[string]util.WorkerPool),
|
||||
closeCh: make(chan struct{}),
|
||||
setModeCh: make(chan setModeRequest),
|
||||
evacuateLimiter: &evacuationLimiter{
|
||||
guard: &sync.RWMutex{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -14,6 +15,9 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
|
|||
shardID []*shard.ID
|
||||
handler func(context.Context, oid.Address, *objectSDK.Object) error
|
||||
ignoreErrors bool
|
||||
async bool
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
type EvacuateShardRes struct {
|
||||
count int
|
||||
evacuated *atomic.Uint64
|
||||
total *atomic.Uint64
|
||||
failed *atomic.Uint64
|
||||
}
|
||||
|
||||
// NewEvacuateShardRes creates new EvacuateShardRes instance.
|
||||
func NewEvacuateShardRes() *EvacuateShardRes {
|
||||
return &EvacuateShardRes{
|
||||
evacuated: atomic.NewUint64(0),
|
||||
total: atomic.NewUint64(0),
|
||||
failed: atomic.NewUint64(0),
|
||||
}
|
||||
}
|
||||
|
||||
// WithShardIDList sets shard ID.
|
||||
|
@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
|
|||
p.handler = f
|
||||
}
|
||||
|
||||
// Count returns amount of evacuated objects.
|
||||
// WithAsync sets flag to run evacuate async.
|
||||
func (p *EvacuateShardPrm) WithAsync(async bool) {
|
||||
p.async = async
|
||||
}
|
||||
|
||||
// Evacuated returns amount of evacuated objects.
|
||||
// Objects for which handler returned no error are also assumed evacuated.
|
||||
func (p EvacuateShardRes) Count() int {
|
||||
return p.count
|
||||
func (p *EvacuateShardRes) Evacuated() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.evacuated.Load()
|
||||
}
|
||||
|
||||
// Total returns total count objects to evacuate.
|
||||
func (p *EvacuateShardRes) Total() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.total.Load()
|
||||
}
|
||||
|
||||
// Failed returns count of failed objects to evacuate.
|
||||
func (p *EvacuateShardRes) Failed() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.failed.Load()
|
||||
}
|
||||
|
||||
// DeepCopy returns deep copy of result instance.
|
||||
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return &EvacuateShardRes{
|
||||
evacuated: atomic.NewUint64(p.evacuated.Load()),
|
||||
total: atomic.NewUint64(p.total.Load()),
|
||||
failed: atomic.NewUint64(p.failed.Load()),
|
||||
}
|
||||
}
|
||||
|
||||
const defaultEvacuateBatchSize = 100
|
||||
|
@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
|
|||
|
||||
// Evacuate moves data from one shard to the others.
|
||||
// The shard being moved must be in read-only mode.
|
||||
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
|
||||
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
shardIDs := make([]string, len(prm.shardID))
|
||||
for i := range prm.shardID {
|
||||
shardIDs[i] = prm.shardID[i].String()
|
||||
}
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
|
||||
trace.WithAttributes(
|
||||
attribute.StringSlice("shardIDs", shardIDs),
|
||||
attribute.Bool("async", prm.async),
|
||||
attribute.Bool("ignoreErrors", prm.ignoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
|
||||
if err != nil {
|
||||
return EvacuateShardRes{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shardsToEvacuate := make(map[string]*shard.Shard)
|
||||
|
@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
|
|||
}
|
||||
}
|
||||
|
||||
res := NewEvacuateShardRes()
|
||||
ctx = ctxOrBackground(ctx, prm.async)
|
||||
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
|
||||
})
|
||||
|
||||
if prm.async {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return res, eg.Wait()
|
||||
}
|
||||
|
||||
func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
||||
if background {
|
||||
return context.Background()
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
var err error
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||
trace.WithAttributes(
|
||||
attribute.StringSlice("shardIDs", shardIDs),
|
||||
attribute.Bool("async", prm.async),
|
||||
attribute.Bool("ignoreErrors", prm.ignoreErrors),
|
||||
))
|
||||
|
||||
defer func() {
|
||||
span.End()
|
||||
e.evacuateLimiter.Complete(err)
|
||||
}()
|
||||
|
||||
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
||||
|
||||
var res EvacuateShardRes
|
||||
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, shardID := range shardIDs {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
|
||||
return res, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
||||
return res, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
for _, sh := range shardsToEvacuate {
|
||||
cnt, err := sh.LogicalObjectsCount(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, shard.ErrDegradedMode) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
res.total.Add(cnt)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shardID", shardID),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var listPrm shard.ListWithCursorPrm
|
||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
|
@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
|||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
||||
break
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
|
|||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("objects_count", len(toEvacuate)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
for i := range toEvacuate {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
res.failed.Inc()
|
||||
continue
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
|
||||
err = prm.handler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
res.count++
|
||||
res.evacuated.Inc()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
|
||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
||||
for j := range shards {
|
||||
select {
|
||||
|
@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
|
||||
if putDone || exists {
|
||||
if putDone {
|
||||
res.evacuated.Inc()
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr))
|
||||
res.count++
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
return e.evacuateLimiter.GetState(), nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
return e.evacuateLimiter.CancelIfRunning()
|
||||
}
|
||||
|
|
178
pkg/local_object_storage/engine/evacuate_limiter.go
Normal file
178
pkg/local_object_storage/engine/evacuate_limiter.go
Normal file
|
@ -0,0 +1,178 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type EvacuateProcessState int
|
||||
|
||||
const (
|
||||
EvacuateProcessStateUndefined EvacuateProcessState = iota
|
||||
EvacuateProcessStateRunning
|
||||
EvacuateProcessStateCompleted
|
||||
)
|
||||
|
||||
type EvacuationState struct {
|
||||
shardIDs []string
|
||||
processState EvacuateProcessState
|
||||
startedAt time.Time
|
||||
finishedAt time.Time
|
||||
result *EvacuateShardRes
|
||||
errMessage string
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ShardIDs() []string {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
return s.shardIDs
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Evacuated() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Evacuated()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Total() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Total()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Failed() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Failed()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
||||
if s == nil {
|
||||
return EvacuateProcessStateUndefined
|
||||
}
|
||||
return s.processState
|
||||
}
|
||||
|
||||
func (s *EvacuationState) StartedAt() *time.Time {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
defaultTime := time.Time{}
|
||||
if s.startedAt == defaultTime {
|
||||
return nil
|
||||
}
|
||||
return &s.startedAt
|
||||
}
|
||||
|
||||
func (s *EvacuationState) FinishedAt() *time.Time {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
defaultTime := time.Time{}
|
||||
if s.finishedAt == defaultTime {
|
||||
return nil
|
||||
}
|
||||
return &s.finishedAt
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ErrorMessage() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.errMessage
|
||||
}
|
||||
|
||||
func (s *EvacuationState) DeepCopy() *EvacuationState {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
shardIDs := make([]string, len(s.shardIDs))
|
||||
copy(shardIDs, s.shardIDs)
|
||||
|
||||
return &EvacuationState{
|
||||
shardIDs: shardIDs,
|
||||
processState: s.processState,
|
||||
startedAt: s.startedAt,
|
||||
finishedAt: s.finishedAt,
|
||||
errMessage: s.errMessage,
|
||||
result: s.result.DeepCopy(),
|
||||
}
|
||||
}
|
||||
|
||||
type evacuationLimiter struct {
|
||||
state EvacuationState
|
||||
eg *errgroup.Group
|
||||
cancel context.CancelFunc
|
||||
|
||||
guard *sync.RWMutex
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if l.state.processState == EvacuateProcessStateRunning {
|
||||
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
|
||||
}
|
||||
|
||||
var egCtx context.Context
|
||||
egCtx, l.cancel = context.WithCancel(ctx)
|
||||
l.eg, egCtx = errgroup.WithContext(egCtx)
|
||||
l.state = EvacuationState{
|
||||
shardIDs: shardIDs,
|
||||
processState: EvacuateProcessStateRunning,
|
||||
startedAt: time.Now().UTC(),
|
||||
result: result,
|
||||
}
|
||||
|
||||
return l.eg, egCtx, nil
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) Complete(err error) {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
errMsq := ""
|
||||
if err != nil {
|
||||
errMsq = err.Error()
|
||||
}
|
||||
l.state.processState = EvacuateProcessStateCompleted
|
||||
l.state.errMessage = errMsq
|
||||
l.state.finishedAt = time.Now().UTC()
|
||||
|
||||
l.eg = nil
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) GetState() *EvacuationState {
|
||||
l.guard.RLock()
|
||||
defer l.guard.RUnlock()
|
||||
|
||||
return l.state.DeepCopy()
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) CancelIfRunning() error {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
if l.state.processState != EvacuateProcessStateRunning {
|
||||
return logicerr.New("there is no running evacuation task")
|
||||
}
|
||||
|
||||
l.cancel()
|
||||
return nil
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||
|
@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) {
|
|||
t.Run("must be read-only", func(t *testing.T) {
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, ErrMustBeReadOnly)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
})
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objPerShard, res.count)
|
||||
require.Equal(t, uint64(objPerShard), res.Evacuated())
|
||||
|
||||
// We check that all objects are available both before and after shard removal.
|
||||
// First case is a real-world use-case. It ensures that an object can be put in presense
|
||||
|
@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) {
|
|||
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, res.count)
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
|
||||
checkHasObjects(t)
|
||||
|
||||
|
@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
var errReplication = errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error {
|
||||
var n int
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
|
||||
var n uint64
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
if n == max {
|
||||
return errReplication
|
||||
|
@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errMustHaveTwoShards)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
|
||||
prm.handler = acceptOneOf(objects, 2)
|
||||
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
require.Equal(t, uint64(2), res.Evacuated())
|
||||
})
|
||||
t.Run("multiple shards, evacuate one", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
require.Equal(t, uint64(2), res.Evacuated())
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
prm.handler = acceptOneOf(objects, 3)
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, res.Count())
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
})
|
||||
})
|
||||
t.Run("multiple shards, evacuate many", func(t *testing.T) {
|
||||
|
@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
e, ids, objects := newEngineEvacuate(t, 4, 5)
|
||||
evacuateIDs := ids[0:3]
|
||||
|
||||
var totalCount int
|
||||
var totalCount uint64
|
||||
for i := range evacuateIDs {
|
||||
res, err := e.shards[ids[i].String()].List()
|
||||
require.NoError(t, err)
|
||||
|
||||
totalCount += len(res.AddressList())
|
||||
totalCount += uint64(len(res.AddressList()))
|
||||
}
|
||||
|
||||
for i := range ids {
|
||||
|
@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, totalCount-1, res.Count())
|
||||
require.Equal(t, totalCount-1, res.Evacuated())
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
prm.handler = acceptOneOf(objects, totalCount)
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, totalCount, res.Count())
|
||||
require.Equal(t, totalCount, res.Evacuated())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(ctx, prm)
|
||||
require.ErrorContains(t, err, "context canceled")
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
}
|
||||
|
||||
func TestEvacuateSingleProcess(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[1:2]
|
||||
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.NoError(t, err, "first evacuation failed")
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
<-running
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
close(blocker)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestEvacuateAsync(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[1:2]
|
||||
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
}
|
||||
|
||||
st, err := e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get init state failed")
|
||||
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
||||
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
|
||||
require.Nil(t, st.StartedAt(), "invalid init started at")
|
||||
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
||||
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.NoError(t, err, "first evacuation failed")
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
return nil
|
||||
})
|
||||
|
||||
<-running
|
||||
|
||||
st, err = e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get running state failed")
|
||||
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
|
||||
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
|
||||
require.NotNil(t, st.StartedAt(), "invalid running started at")
|
||||
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
||||
expectedShardIDs := make([]string, 0, 2)
|
||||
for _, id := range ids[1:2] {
|
||||
expectedShardIDs = append(expectedShardIDs, id.String())
|
||||
}
|
||||
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
||||
|
||||
close(blocker)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
st, err = e.GetEvacuationState(context.Background())
|
||||
return st.ProcessingStatus() == EvacuateProcessStateCompleted
|
||||
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
|
||||
|
||||
require.NoError(t, err, "get final state failed")
|
||||
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
|
||||
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
||||
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
||||
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
|
31
pkg/local_object_storage/shard/count.go
Normal file
31
pkg/local_object_storage/shard/count.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// LogicalObjectsCount returns logical objects count.
|
||||
func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.GetMode().NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
cc, err := s.metaBase.ObjectCounters()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return cc.Logic(), nil
|
||||
}
|
|
@ -8,15 +8,18 @@ import (
|
|||
const serviceName = "control.ControlService"
|
||||
|
||||
const (
|
||||
rpcHealthCheck = "HealthCheck"
|
||||
rpcSetNetmapStatus = "SetNetmapStatus"
|
||||
rpcDropObjects = "DropObjects"
|
||||
rpcListShards = "ListShards"
|
||||
rpcSetShardMode = "SetShardMode"
|
||||
rpcSynchronizeTree = "SynchronizeTree"
|
||||
rpcEvacuateShard = "EvacuateShard"
|
||||
rpcFlushCache = "FlushCache"
|
||||
rpcDoctor = "Doctor"
|
||||
rpcHealthCheck = "HealthCheck"
|
||||
rpcSetNetmapStatus = "SetNetmapStatus"
|
||||
rpcDropObjects = "DropObjects"
|
||||
rpcListShards = "ListShards"
|
||||
rpcSetShardMode = "SetShardMode"
|
||||
rpcSynchronizeTree = "SynchronizeTree"
|
||||
rpcEvacuateShard = "EvacuateShard"
|
||||
rpcStartEvacuateShard = "StartEvacuateShard"
|
||||
rpcGetEvacuateShardStatus = "GetEvacuateShardStatus"
|
||||
rpcStopEvacuateShardStatus = "StopEvacuateShard"
|
||||
rpcFlushCache = "FlushCache"
|
||||
rpcDoctor = "Doctor"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client
|
|||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// StartEvacuateShard executes ControlService.StartEvacuateShard RPC.
|
||||
func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
|
||||
wResp := newResponseWrapper[StartShardEvacuationResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC.
|
||||
func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) {
|
||||
wResp := newResponseWrapper[GetShardEvacuationStatusResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// StopEvacuateShard executes ControlService.StopEvacuateShard RPC.
|
||||
func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) {
|
||||
wResp := newResponseWrapper[StopShardEvacuationResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// FlushCache executes ControlService.FlushCache RPC.
|
||||
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
|
||||
wResp := newResponseWrapper[FlushCacheResponse]()
|
||||
|
|
60
pkg/services/control/server/convert.go
Normal file
60
pkg/services/control/server/convert.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/mr-tron/base58"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) {
|
||||
shardIDs := make([][]byte, 0, len(state.ShardIDs()))
|
||||
for _, shID := range state.ShardIDs() {
|
||||
id, err := base58.Decode(shID)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID))
|
||||
}
|
||||
shardIDs = append(shardIDs, id)
|
||||
}
|
||||
var evacStatus control.GetShardEvacuationStatusResponse_Body_Status
|
||||
switch state.ProcessingStatus() {
|
||||
case engine.EvacuateProcessStateRunning:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING
|
||||
case engine.EvacuateProcessStateCompleted:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED
|
||||
default:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED
|
||||
}
|
||||
var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp
|
||||
if state.StartedAt() != nil {
|
||||
startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{
|
||||
Value: state.StartedAt().Unix(),
|
||||
}
|
||||
}
|
||||
var duration *control.GetShardEvacuationStatusResponse_Body_Duration
|
||||
if state.StartedAt() != nil {
|
||||
end := time.Now().UTC()
|
||||
if state.FinishedAt() != nil {
|
||||
end = *state.FinishedAt()
|
||||
}
|
||||
duration = &control.GetShardEvacuationStatusResponse_Body_Duration{
|
||||
Seconds: int64(end.Sub(*state.StartedAt()).Seconds()),
|
||||
}
|
||||
}
|
||||
return &control.GetShardEvacuationStatusResponse{
|
||||
Body: &control.GetShardEvacuationStatusResponse_Body{
|
||||
Shard_ID: shardIDs,
|
||||
Evacuated: state.Evacuated(),
|
||||
Total: state.Total(),
|
||||
Failed: state.Failed(),
|
||||
Status: evacStatus,
|
||||
StartedAt: startedAt,
|
||||
Duration: duration,
|
||||
ErrorMessage: state.ErrorMessage(),
|
||||
},
|
||||
}, nil
|
||||
}
|
|
@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
|
||||
resp := &control.EvacuateShardResponse{
|
||||
Body: &control.EvacuateShardResponse_Body{
|
||||
Count: uint32(res.Count()),
|
||||
Count: uint32(res.Evacuated()),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
97
pkg/services/control/server/evacuate_async.go
Normal file
97
pkg/services/control/server/evacuate_async.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
var prm engine.EvacuateShardPrm
|
||||
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
|
||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
||||
prm.WithFaultHandler(s.replicate)
|
||||
prm.WithAsync(true)
|
||||
|
||||
_, err = s.s.Evacuate(ctx, prm)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.StartShardEvacuationResponse{
|
||||
Body: &control.StartShardEvacuationResponse_Body{},
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
state, err := s.s.GetEvacuationState(ctx)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp, err := stateToResponse(state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
err = s.s.EnqueRunningEvacuationStop(ctx)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.StopShardEvacuationResponse{
|
||||
Body: &control.StopShardEvacuationResponse_Body{},
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -27,8 +27,18 @@ service ControlService {
|
|||
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
|
||||
|
||||
// EvacuateShard moves all data from one shard to the others.
|
||||
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
|
||||
rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse);
|
||||
|
||||
// StartShardEvacuation starts moving all data from one shard to the others.
|
||||
rpc StartShardEvacuation (StartShardEvacuationRequest) returns (StartShardEvacuationResponse);
|
||||
|
||||
// GetShardEvacuationStatus returns evacuation status.
|
||||
rpc GetShardEvacuationStatus (GetShardEvacuationStatusRequest) returns (GetShardEvacuationStatusResponse);
|
||||
|
||||
// StopShardEvacuation stops moving all data from one shard to the others.
|
||||
rpc StopShardEvacuation (StopShardEvacuationRequest) returns (StopShardEvacuationResponse);
|
||||
|
||||
// FlushCache moves all data from one shard to the others.
|
||||
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
|
||||
|
||||
|
@ -298,3 +308,97 @@ message DoctorResponse {
|
|||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StartShardEvacuation request.
|
||||
message StartShardEvacuationRequest {
|
||||
// Request body structure.
|
||||
message Body {
|
||||
// IDs of the shards.
|
||||
repeated bytes shard_ID = 1;
|
||||
// Flag indicating whether object read errors should be ignored.
|
||||
bool ignore_errors = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StartShardEvacuation response.
|
||||
message StartShardEvacuationResponse {
|
||||
// Response body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// GetShardEvacuationStatus request.
|
||||
message GetShardEvacuationStatusRequest {
|
||||
// Request body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// GetShardEvacuationStatus response.
|
||||
message GetShardEvacuationStatusResponse {
|
||||
// Response body structure.
|
||||
message Body {
|
||||
// Evacuate status enum.
|
||||
enum Status {
|
||||
EVACUATE_SHARD_STATUS_UNDEFINED = 0;
|
||||
RUNNING = 1;
|
||||
COMPLETED = 2;
|
||||
}
|
||||
|
||||
// Unix timestamp value.
|
||||
message UnixTimestamp {
|
||||
int64 value = 1;
|
||||
}
|
||||
|
||||
// Duration in seconds.
|
||||
message Duration {
|
||||
int64 seconds = 1;
|
||||
}
|
||||
|
||||
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
|
||||
uint64 total = 1;
|
||||
// Evacuated objects count.
|
||||
uint64 evacuated = 2;
|
||||
// Failed objects count.
|
||||
uint64 failed = 3;
|
||||
|
||||
// Shard IDs.
|
||||
repeated bytes shard_ID = 4;
|
||||
// Evacuation process status.
|
||||
Status status = 5;
|
||||
// Evacuation process duration.
|
||||
Duration duration = 6;
|
||||
// Evacuation process started at timestamp.
|
||||
UnixTimestamp started_at = 7;
|
||||
// Error message if evacuation failed.
|
||||
string error_message = 8;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StopShardEvacuation request.
|
||||
message StopShardEvacuationRequest {
|
||||
// Request body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StopShardEvacuation response.
|
||||
message StopShardEvacuationResponse {
|
||||
// Response body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/types.pb.go
generated
BIN
pkg/services/control/types.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue