Async evacuate #329
|
@ -11,10 +11,11 @@ import (
|
|||
const ignoreErrorsFlag = "no-errors"
|
||||
|
||||
var evacuateShardCmd = &cobra.Command{
|
||||
Use: "evacuate",
|
||||
Short: "Evacuate objects from shard",
|
||||
Long: "Evacuate objects from shard to other shards",
|
||||
Run: evacuateShard,
|
||||
Use: "evacuate",
|
||||
Short: "Evacuate objects from shard",
|
||||
Long: "Evacuate objects from shard to other shards",
|
||||
Run: evacuateShard,
|
||||
Deprecated: "use frostfs-cli control shards evacuation start",
|
||||
}
|
||||
|
||||
func evacuateShard(cmd *cobra.Command, _ []string) {
|
||||
|
|
296
cmd/frostfs-cli/modules/control/evacuation.go
Normal file
|
@ -0,0 +1,296 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/spf13/cobra"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
const (
|
||||
awaitFlag = "await"
|
||||
noProgressFlag = "no-progress"
|
||||
)
|
||||
|
||||
var evacuationShardCmd = &cobra.Command{
|
||||
Use: "evacuation",
|
||||
Short: "Objects evacuation from shard",
|
||||
Long: "Objects evacuation from shard to other shards",
|
||||
}
|
||||
|
||||
var startEvacuationShardCmd = &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "Start evacuate objects from shard",
|
||||
Long: "Start evacuate objects from shard to other shards",
|
||||
Run: startEvacuateShard,
|
||||
}
|
||||
|
||||
var getEvacuationShardStatusCmd = &cobra.Command{
|
||||
Use: "status",
|
||||
Short: "Get evacuate objects from shard status",
|
||||
Long: "Get evacuate objects from shard to other shards status",
|
||||
Run: getEvacuateShardStatus,
|
||||
}
|
||||
|
||||
var stopEvacuationShardCmd = &cobra.Command{
|
||||
Use: "stop",
|
||||
Short: "Stop running evacuate process",
|
||||
Long: "Stop running evacuate process from shard to other shards",
|
||||
Run: stopEvacuateShardStatus,
|
||||
}
|
||||
|
||||
func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
|
||||
req := &control.StartShardEvacuationRequest{
|
||||
Body: &control.StartShardEvacuationRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.StartShardEvacuationResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.StartEvacuateShard(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
cmd.Println("Shard evacuation has been successfully started.")
|
||||
|
||||
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
|
||||
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
|
||||
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
|
||||
}
|
||||
}
|
||||
|
||||
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
req := &control.GetShardEvacuationStatusRequest{
|
||||
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.GetShardEvacuationStatusResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
printStatus(cmd, resp)
|
||||
}
|
||||
|
||||
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
req := &control.StopShardEvacuationRequest{
|
||||
Body: &control.StopShardEvacuationRequest_Body{},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.StopShardEvacuationResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.StopEvacuateShard(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
waitEvacuateCompletion(cmd, pk, cli, false, false)
|
||||
|
||||
cmd.Println("Evacuation stopped.")
|
||||
}
|
||||
|
||||
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
|
||||
const statusPollingInterval = 1 * time.Second
|
||||
const reportIntervalSeconds = 5
|
||||
var resp *control.GetShardEvacuationStatusResponse
|
||||
reportResponse := atomic.NewPointer(resp)
|
||||
pollingCompleted := make(chan struct{})
|
||||
fyrchik marked this conversation as resolved
Outdated
acid-ant
commented
Is it possible to use Is it possible to use `any` here and below?
dstepanov-yadro
commented
fixed to struct{} fixed to struct{}
|
||||
progressReportCompleted := make(chan struct{})
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Also, why Also, why `interface{}` and not `struct{}`? It seems we only close it.
dstepanov-yadro
commented
Ok, struct{}. Fixed. Ok, struct{}. Fixed.
|
||||
|
||||
go func() {
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we need a goroutine here? It seems we already sleep in the main loop. Why do we need a goroutine here? It seems we already sleep in the main loop.
dstepanov-yadro
commented
Goroutine prints report every N seconds. Main loop makes request and sleeps. Goroutine prints report every N seconds. Main loop makes request and sleeps.
|
||||
defer close(progressReportCompleted)
|
||||
if !printProgress {
|
||||
return
|
||||
}
|
||||
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
|
||||
for {
|
||||
select {
|
||||
case <-pollingCompleted:
|
||||
return
|
||||
case <-time.After(reportIntervalSeconds * time.Second):
|
||||
r := reportResponse.Load()
|
||||
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
|
||||
continue
|
||||
}
|
||||
printStatus(cmd, r)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
req := &control.GetShardEvacuationStatusRequest{
|
||||
Body: &control.GetShardEvacuationStatusRequest_Body{},
|
||||
}
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.GetEvacuateShardStatus(client, req)
|
||||
return err
|
||||
})
|
||||
|
||||
reportResponse.Store(resp)
|
||||
|
||||
if err != nil {
|
||||
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
|
||||
return
|
||||
}
|
||||
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(statusPollingInterval)
|
||||
}
|
||||
close(pollingCompleted)
|
||||
<-progressReportCompleted
|
||||
if printCompleted {
|
||||
printCompletedStatusMessage(cmd, resp)
|
||||
}
|
||||
}
|
||||
|
||||
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||
cmd.Println("Shard evacuation has been completed.")
|
||||
sb := &strings.Builder{}
|
||||
appendShardIDs(sb, resp)
|
||||
appendCounts(sb, resp)
|
||||
appendError(sb, resp)
|
||||
appendStartedAt(sb, resp)
|
||||
appendDuration(sb, resp)
|
||||
cmd.Println(sb.String())
|
||||
}
|
||||
|
||||
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
|
||||
cmd.Println("There is no running or completed evacuation.")
|
||||
return
|
||||
}
|
||||
sb := &strings.Builder{}
|
||||
appendShardIDs(sb, resp)
|
||||
appendStatus(sb, resp)
|
||||
appendCounts(sb, resp)
|
||||
appendError(sb, resp)
|
||||
appendStartedAt(sb, resp)
|
||||
appendDuration(sb, resp)
|
||||
cmd.Println(sb.String())
|
||||
}
|
||||
|
||||
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetDuration() != nil {
|
||||
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
|
||||
hour := int(duration.Seconds() / 3600)
|
||||
minute := int(duration.Seconds()/60) % 60
|
||||
second := int(duration.Seconds()) % 60
|
||||
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
|
||||
}
|
||||
}
|
||||
|
||||
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if resp.GetBody().GetStartedAt() != nil {
|
||||
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
|
||||
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
|
||||
}
|
||||
}
|
||||
|
||||
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
if len(resp.Body.GetErrorMessage()) > 0 {
|
||||
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
|
||||
}
|
||||
}
|
||||
|
||||
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
var status string
|
||||
switch resp.GetBody().GetStatus() {
|
||||
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
|
||||
status = "completed"
|
||||
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
|
||||
status = "running"
|
||||
default:
|
||||
status = "undefined"
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
|
||||
}
|
||||
|
||||
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
sb.WriteString("Shard IDs: ")
|
||||
for idx, shardID := range resp.GetBody().GetShard_ID() {
|
||||
shardIDStr := shard.NewIDFromBytes(shardID).String()
|
||||
if idx > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString(shardIDStr)
|
||||
if idx == len(resp.GetBody().GetShard_ID())-1 {
|
||||
sb.WriteString(".")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
|
||||
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
|
||||
resp.GetBody().GetEvacuated(),
|
||||
resp.Body.GetTotal(),
|
||||
resp.Body.GetFailed()))
|
||||
}
|
||||
|
||||
func initControlEvacuationShardCmd() {
|
||||
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
|
||||
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
|
||||
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
|
||||
|
||||
initControlStartEvacuationShardCmd()
|
||||
initControlFlags(getEvacuationShardStatusCmd)
|
||||
initControlFlags(stopEvacuationShardCmd)
|
||||
}
|
||||
|
||||
func initControlStartEvacuationShardCmd() {
|
||||
initControlFlags(startEvacuationShardCmd)
|
||||
|
||||
flags := startEvacuationShardCmd.Flags()
|
||||
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
flags.Bool(shardAllFlag, false, "Process all shards")
|
||||
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||
|
||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
|
@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{
|
|||
func initControlShardsCmd() {
|
||||
shardsCmd.AddCommand(listShardsCmd)
|
||||
shardsCmd.AddCommand(setShardModeCmd)
|
||||
shardsCmd.AddCommand(evacuateShardCmd)
|
||||
shardsCmd.AddCommand(evacuationShardCmd)
|
||||
shardsCmd.AddCommand(flushCacheCmd)
|
||||
shardsCmd.AddCommand(doctorCmd)
|
||||
|
||||
initControlShardsListCmd()
|
||||
initControlSetShardModeCmd()
|
||||
initControlEvacuateShardCmd()
|
||||
initControlEvacuationShardCmd()
|
||||
initControlFlushCacheCmd()
|
||||
initControlDoctorCmd()
|
||||
}
|
||||
|
|
92
docs/evacuation.md
Normal file
|
@ -0,0 +1,92 @@
|
|||
# Shard data evacuation
|
||||
|
||||
## Overview
|
||||
|
||||
Evacuation is the process of transferring data from one shard to another. Evacuation is used in case of problems with the shard in order to save data.
|
||||
|
||||
To start the evacuation, it is necessary that the shard is in read-only mode (read more [here](./shard-modes.md)).
|
||||
|
||||
First of all, by the evacuation the data is transferred to other shards of the same node; if it is not possible, then the data is transferred to other nodes.
|
||||
realloc
commented
Is it true, that if the one wants to migrate all the data out of a storage node, she needs to add all shards with a Is it true, that if the one wants to migrate all the data out of a storage node, she needs to add all shards with a `shardAllFlag` toggled? If so, can we emphasize it in documentation?
dstepanov-yadro
commented
Added to Added to `Commands` section.
|
||||
|
||||
Only one running evacuation process is allowed on the node at a time.
|
||||
|
||||
`frostfs-cli` utility is used to manage evacuation.
|
||||
|
||||
## Commands
|
||||
|
||||
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
|
||||
|
||||
`frostfs-cli control shards evacuation stop` stops running evacuation process.
|
||||
|
||||
`frostfs-cli control shards evacuation status` prints evacuation process status.
|
||||
|
||||
See commands `--help` output for detailed description.
|
||||
|
||||
## Examples
|
||||
|
||||
### Set shard mode to read only
|
||||
```bash
|
||||
frostfs-cli control shards set-mode --mode read-only --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
|
||||
Enter password >
|
||||
Shard mode update request successfully sent.
|
||||
```
|
||||
|
||||
### Start evacuation and get status
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:05.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:13.
|
||||
```
|
||||
|
||||
### Stop running evacuation process
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03.
|
||||
|
||||
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Evacuation stopped.
|
||||
|
||||
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
|
||||
Enter password >
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
|
||||
```
|
||||
|
||||
### Start evacuation and await it completes
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
Progress will be reported every 5 seconds.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
|
||||
Shard evacuation has been completed.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
|
||||
```
|
||||
|
||||
### Start evacuation and await it completes without progress notifications
|
||||
```bash
|
||||
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await --no-progress
|
||||
Enter password >
|
||||
Shard evacuation has been successfully started.
|
||||
Shard evacuation has been completed.
|
||||
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
|
||||
```
|
|
@ -484,5 +484,9 @@ const (
|
|||
ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed"
|
||||
ShardGCRemoveGarbageStarted = "garbage remove started"
|
||||
ShardGCRemoveGarbageCompleted = "garbage remove completed"
|
||||
EngineShardsEvacuationFailedToCount = "failed to get total objects count to evacuate"
|
||||
EngineShardsEvacuationFailedToListObjects = "failed to list objects to evacuate"
|
||||
EngineShardsEvacuationFailedToReadObject = "failed to read object to evacuate"
|
||||
EngineShardsEvacuationFailedToMoveObject = "failed to evacuate object to other node"
|
||||
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
|
||||
)
|
||||
|
|
|
@ -35,6 +35,7 @@ type StorageEngine struct {
|
|||
|
||||
err error
|
||||
}
|
||||
evacuateLimiter *evacuationLimiter
|
||||
}
|
||||
|
||||
type shardWrapper struct {
|
||||
|
@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine {
|
|||
shardPools: make(map[string]util.WorkerPool),
|
||||
closeCh: make(chan struct{}),
|
||||
setModeCh: make(chan setModeRequest),
|
||||
evacuateLimiter: &evacuationLimiter{
|
||||
guard: &sync.RWMutex{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -14,6 +15,9 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
|
|||
shardID []*shard.ID
|
||||
handler func(context.Context, oid.Address, *objectSDK.Object) error
|
||||
ignoreErrors bool
|
||||
async bool
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
type EvacuateShardRes struct {
|
||||
count int
|
||||
evacuated *atomic.Uint64
|
||||
total *atomic.Uint64
|
||||
failed *atomic.Uint64
|
||||
}
|
||||
|
||||
// NewEvacuateShardRes creates new EvacuateShardRes instance.
|
||||
func NewEvacuateShardRes() *EvacuateShardRes {
|
||||
return &EvacuateShardRes{
|
||||
evacuated: atomic.NewUint64(0),
|
||||
total: atomic.NewUint64(0),
|
||||
failed: atomic.NewUint64(0),
|
||||
}
|
||||
}
|
||||
|
||||
// WithShardIDList sets shard ID.
|
||||
|
@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
|
|||
p.handler = f
|
||||
}
|
||||
|
||||
// Count returns amount of evacuated objects.
|
||||
// WithAsync sets flag to run evacuate async.
|
||||
func (p *EvacuateShardPrm) WithAsync(async bool) {
|
||||
p.async = async
|
||||
}
|
||||
|
||||
// Evacuated returns amount of evacuated objects.
|
||||
// Objects for which handler returned no error are also assumed evacuated.
|
||||
func (p EvacuateShardRes) Count() int {
|
||||
return p.count
|
||||
func (p *EvacuateShardRes) Evacuated() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.evacuated.Load()
|
||||
}
|
||||
|
||||
// Total returns total count objects to evacuate.
|
||||
func (p *EvacuateShardRes) Total() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.total.Load()
|
||||
}
|
||||
|
||||
// Failed returns count of failed objects to evacuate.
|
||||
func (p *EvacuateShardRes) Failed() uint64 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
return p.failed.Load()
|
||||
}
|
||||
|
||||
// DeepCopy returns deep copy of result instance.
|
||||
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return &EvacuateShardRes{
|
||||
evacuated: atomic.NewUint64(p.evacuated.Load()),
|
||||
total: atomic.NewUint64(p.total.Load()),
|
||||
failed: atomic.NewUint64(p.failed.Load()),
|
||||
}
|
||||
}
|
||||
|
||||
const defaultEvacuateBatchSize = 100
|
||||
|
@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
|
|||
|
||||
// Evacuate moves data from one shard to the others.
|
||||
// The shard being moved must be in read-only mode.
|
||||
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
|
||||
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
|
||||
select {
|
||||
aarifullin
commented
Could you explain, please, why have you put these Could you explain, please, why have you put these `select`-s at the beginning of the methods?
For me this `select` looks inconvinient. This check should be performed by the code that runs something asynchroniously (like the errgroup below)
dstepanov-yadro
commented
RPC call has deadline, so RPC call has deadline, so `<-ctx.Done()` can be true.
aarifullin
commented
No doubt. But why don't we rely on the context cancellation on > can be true
No doubt.
But why don't we rely on the context cancellation on `errgroup.Go` invocation level?
dstepanov-yadro
commented
errGroup can use detached context (context.Background()) in case of async execution, so this check will be the only one for ctx.Done errGroup can use detached context (context.Background()) in case of async execution, so this check will be the only one for ctx.Done
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
shardIDs := make([]string, len(prm.shardID))
|
||||
for i := range prm.shardID {
|
||||
shardIDs[i] = prm.shardID[i].String()
|
||||
}
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
|
||||
trace.WithAttributes(
|
||||
attribute.StringSlice("shardIDs", shardIDs),
|
||||
attribute.Bool("async", prm.async),
|
||||
attribute.Bool("ignoreErrors", prm.ignoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
|
||||
if err != nil {
|
||||
return EvacuateShardRes{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shardsToEvacuate := make(map[string]*shard.Shard)
|
||||
|
@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
|
|||
}
|
||||
}
|
||||
|
||||
res := NewEvacuateShardRes()
|
||||
ctx = ctxOrBackground(ctx, prm.async)
|
||||
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
|
||||
})
|
||||
|
||||
if prm.async {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return res, eg.Wait()
|
||||
}
|
||||
acid-ant
commented
Is it possible to squash it in one line? Is it possible to squash it in one line?
dstepanov-yadro
commented
fixed. fixed.
|
||||
|
||||
func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
||||
if background {
|
||||
return context.Background()
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
var err error
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||
trace.WithAttributes(
|
||||
attribute.StringSlice("shardIDs", shardIDs),
|
||||
attribute.Bool("async", prm.async),
|
||||
attribute.Bool("ignoreErrors", prm.ignoreErrors),
|
||||
))
|
||||
|
||||
defer func() {
|
||||
span.End()
|
||||
e.evacuateLimiter.Complete(err)
|
||||
}()
|
||||
|
||||
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
||||
|
||||
var res EvacuateShardRes
|
||||
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
for _, shardID := range shardIDs {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
|
||||
return res, err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
|
||||
return res, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
for _, sh := range shardsToEvacuate {
|
||||
cnt, err := sh.LogicalObjectsCount(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, shard.ErrDegradedMode) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
res.total.Add(cnt)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shardID", shardID),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var listPrm shard.ListWithCursorPrm
|
||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
|
@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
|||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
||||
break
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
|
|||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("objects_count", len(toEvacuate)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
for i := range toEvacuate {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.ignoreErrors {
|
||||
res.failed.Inc()
|
||||
continue
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
|
|||
|
||||
err = prm.handler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
res.count++
|
||||
res.evacuated.Inc()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
|
||||
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
|
||||
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
|
||||
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
|
||||
for j := range shards {
|
||||
select {
|
||||
|
@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
|
||||
if putDone || exists {
|
||||
if putDone {
|
||||
res.evacuated.Inc()
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
zap.Stringer("from", sh.ID()),
|
||||
zap.Stringer("to", shards[j].ID()),
|
||||
zap.Stringer("addr", addr))
|
||||
res.count++
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
return e.evacuateLimiter.GetState(), nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
return e.evacuateLimiter.CancelIfRunning()
|
||||
}
|
||||
|
|
178
pkg/local_object_storage/engine/evacuate_limiter.go
Normal file
|
@ -0,0 +1,178 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type EvacuateProcessState int
|
||||
|
||||
const (
|
||||
EvacuateProcessStateUndefined EvacuateProcessState = iota
|
||||
EvacuateProcessStateRunning
|
||||
EvacuateProcessStateCompleted
|
||||
)
|
||||
|
||||
type EvacuationState struct {
|
||||
shardIDs []string
|
||||
processState EvacuateProcessState
|
||||
startedAt time.Time
|
||||
finishedAt time.Time
|
||||
result *EvacuateShardRes
|
||||
errMessage string
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ShardIDs() []string {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
return s.shardIDs
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Evacuated() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Evacuated()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Total() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Total()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) Failed() uint64 {
|
||||
if s == nil {
|
||||
return 0
|
||||
}
|
||||
return s.result.Failed()
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
|
||||
if s == nil {
|
||||
return EvacuateProcessStateUndefined
|
||||
}
|
||||
return s.processState
|
||||
}
|
||||
|
||||
func (s *EvacuationState) StartedAt() *time.Time {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
defaultTime := time.Time{}
|
||||
if s.startedAt == defaultTime {
|
||||
return nil
|
||||
}
|
||||
return &s.startedAt
|
||||
}
|
||||
|
||||
func (s *EvacuationState) FinishedAt() *time.Time {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
defaultTime := time.Time{}
|
||||
if s.finishedAt == defaultTime {
|
||||
return nil
|
||||
}
|
||||
return &s.finishedAt
|
||||
}
|
||||
|
||||
func (s *EvacuationState) ErrorMessage() string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return s.errMessage
|
||||
}
|
||||
|
||||
func (s *EvacuationState) DeepCopy() *EvacuationState {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
shardIDs := make([]string, len(s.shardIDs))
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do we need to specify client polling interval here in the engine? Do we need to specify _client_ polling interval here in the engine?
dstepanov-yadro
commented
To reduce the load, the server can increase this interval. But now it's constant. To reduce the load, the server can increase this interval. But now it's constant.
Inspired by OAuth2 device code flow: https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-device-code#device-authorization-response
dstepanov-yadro
commented
Ok, dropped Ok, dropped
|
||||
copy(shardIDs, s.shardIDs)
|
||||
|
||||
return &EvacuationState{
|
||||
shardIDs: shardIDs,
|
||||
processState: s.processState,
|
||||
startedAt: s.startedAt,
|
||||
finishedAt: s.finishedAt,
|
||||
errMessage: s.errMessage,
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Isn't it read-only? Isn't it read-only?
dstepanov-yadro
commented
It is. But since the method is called DeepCopy, then the copy must be deep. It is. But since the method is called DeepCopy, then the copy must be deep.
|
||||
result: s.result.DeepCopy(),
|
||||
}
|
||||
}
|
||||
|
||||
type evacuationLimiter struct {
|
||||
state EvacuationState
|
||||
eg *errgroup.Group
|
||||
cancel context.CancelFunc
|
||||
|
||||
guard *sync.RWMutex
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if l.state.processState == EvacuateProcessStateRunning {
|
||||
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
|
||||
}
|
||||
|
||||
var egCtx context.Context
|
||||
egCtx, l.cancel = context.WithCancel(ctx)
|
||||
l.eg, egCtx = errgroup.WithContext(egCtx)
|
||||
l.state = EvacuationState{
|
||||
shardIDs: shardIDs,
|
||||
processState: EvacuateProcessStateRunning,
|
||||
startedAt: time.Now().UTC(),
|
||||
result: result,
|
||||
}
|
||||
|
||||
return l.eg, egCtx, nil
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) Complete(err error) {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
errMsq := ""
|
||||
if err != nil {
|
||||
errMsq = err.Error()
|
||||
}
|
||||
l.state.processState = EvacuateProcessStateCompleted
|
||||
l.state.errMessage = errMsq
|
||||
l.state.finishedAt = time.Now().UTC()
|
||||
|
||||
l.eg = nil
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) GetState() *EvacuationState {
|
||||
l.guard.RLock()
|
||||
defer l.guard.RUnlock()
|
||||
|
||||
return l.state.DeepCopy()
|
||||
}
|
||||
|
||||
func (l *evacuationLimiter) CancelIfRunning() error {
|
||||
l.guard.Lock()
|
||||
defer l.guard.Unlock()
|
||||
|
||||
if l.state.processState != EvacuateProcessStateRunning {
|
||||
return logicerr.New("there is no running evacuation task")
|
||||
fyrchik
commented
Why do we need deepcopy here? Pointers to atomics are ok to copy. Why do we need deepcopy here? Pointers to atomics are ok to copy.
dstepanov-yadro
commented
There were two ways to ensure consistency: mutex inside the structure or deep copy. I chose the second one. This is my engineering decision. There were two ways to ensure consistency: mutex inside the structure or deep copy. I chose the second one. This is my engineering decision.
fyrchik
commented
Could you elaborate, what consistency issues do we have if we do not do a deep copy? Could you elaborate, what consistency issues do we have if we do not do a deep copy?
dstepanov-yadro
commented
```
func (l *evacuationLimiter) Complete(err error) {
l.guard.Lock()
defer l.guard.Unlock()
errMsq := ""
if err != nil {
errMsq = err.Error()
}
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.eg = nil
}
func (l *evacuationLimiter) GetState() *EvacuationState {
l.guard.RLock()
defer l.guard.RUnlock()
return l.state.DeepCopy()
}
```
`state` can change by evacuation goroutine, so we can get completed state without error for example.
|
||||
}
|
||||
|
||||
l.cancel()
|
||||
return nil
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||
|
@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) {
|
|||
t.Run("must be read-only", func(t *testing.T) {
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, ErrMustBeReadOnly)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
})
|
||||
|
||||
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objPerShard, res.count)
|
||||
require.Equal(t, uint64(objPerShard), res.Evacuated())
|
||||
|
||||
// We check that all objects are available both before and after shard removal.
|
||||
// First case is a real-world use-case. It ensures that an object can be put in presense
|
||||
|
@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) {
|
|||
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, res.count)
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
|
||||
checkHasObjects(t)
|
||||
|
||||
|
@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
var errReplication = errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error {
|
||||
var n int
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
|
||||
var n uint64
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
|
||||
if n == max {
|
||||
return errReplication
|
||||
|
@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errMustHaveTwoShards)
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
|
||||
prm.handler = acceptOneOf(objects, 2)
|
||||
|
||||
res, err = e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
require.Equal(t, uint64(2), res.Evacuated())
|
||||
})
|
||||
t.Run("multiple shards, evacuate one", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, 2, res.Count())
|
||||
require.Equal(t, uint64(2), res.Evacuated())
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
prm.handler = acceptOneOf(objects, 3)
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, res.Count())
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
})
|
||||
})
|
||||
t.Run("multiple shards, evacuate many", func(t *testing.T) {
|
||||
|
@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
e, ids, objects := newEngineEvacuate(t, 4, 5)
|
||||
evacuateIDs := ids[0:3]
|
||||
|
||||
var totalCount int
|
||||
var totalCount uint64
|
||||
for i := range evacuateIDs {
|
||||
res, err := e.shards[ids[i].String()].List()
|
||||
require.NoError(t, err)
|
||||
|
||||
totalCount += len(res.AddressList())
|
||||
totalCount += uint64(len(res.AddressList()))
|
||||
}
|
||||
|
||||
for i := range ids {
|
||||
|
@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.ErrorIs(t, err, errReplication)
|
||||
require.Equal(t, totalCount-1, res.Count())
|
||||
require.Equal(t, totalCount-1, res.Evacuated())
|
||||
|
||||
t.Run("no errors", func(t *testing.T) {
|
||||
prm.handler = acceptOneOf(objects, totalCount)
|
||||
|
||||
res, err := e.Evacuate(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, totalCount, res.Count())
|
||||
require.Equal(t, totalCount, res.Evacuated())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) {
|
|||
|
||||
res, err := e.Evacuate(ctx, prm)
|
||||
require.ErrorContains(t, err, "context canceled")
|
||||
require.Equal(t, 0, res.Count())
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
}
|
||||
|
||||
func TestEvacuateSingleProcess(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[1:2]
|
||||
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.NoError(t, err, "first evacuation failed")
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
<-running
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
|
||||
require.Equal(t, uint64(0), res.Evacuated())
|
||||
close(blocker)
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestEvacuateAsync(t *testing.T) {
|
||||
e, ids, _ := newEngineEvacuate(t, 2, 3)
|
||||
|
||||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
blocker := make(chan interface{})
|
||||
running := make(chan interface{})
|
||||
|
||||
var prm EvacuateShardPrm
|
||||
prm.shardID = ids[1:2]
|
||||
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
|
||||
select {
|
||||
case <-running:
|
||||
default:
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We set it only one time, what about closing the channel instead? We set it only one time, what about closing the channel instead?
dstepanov-yadro
commented
fixed fixed
|
||||
close(running)
|
||||
}
|
||||
<-blocker
|
||||
return nil
|
||||
}
|
||||
|
||||
st, err := e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get init state failed")
|
||||
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
|
||||
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
|
||||
require.Nil(t, st.StartedAt(), "invalid init started at")
|
||||
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
||||
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
eg.Go(func() error {
|
||||
res, err := e.Evacuate(egCtx, prm)
|
||||
require.NoError(t, err, "first evacuation failed")
|
||||
require.Equal(t, uint64(3), res.Evacuated())
|
||||
return nil
|
||||
})
|
||||
|
||||
<-running
|
||||
|
||||
st, err = e.GetEvacuationState(context.Background())
|
||||
require.NoError(t, err, "get running state failed")
|
||||
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
|
||||
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
|
||||
require.NotNil(t, st.StartedAt(), "invalid running started at")
|
||||
require.Nil(t, st.FinishedAt(), "invalid init finished at")
|
||||
expectedShardIDs := make([]string, 0, 2)
|
||||
for _, id := range ids[1:2] {
|
||||
expectedShardIDs = append(expectedShardIDs, id.String())
|
||||
}
|
||||
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
|
||||
|
||||
close(blocker)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
st, err = e.GetEvacuationState(context.Background())
|
||||
return st.ProcessingStatus() == EvacuateProcessStateCompleted
|
||||
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
|
||||
|
||||
require.NoError(t, err, "get final state failed")
|
||||
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
|
||||
require.NotNil(t, st.StartedAt(), "invalid final started at")
|
||||
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
|
||||
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
|
||||
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
|
31
pkg/local_object_storage/shard/count.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// LogicalObjectsCount returns logical objects count.
|
||||
func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.GetMode().NoMetabase() {
|
||||
return 0, ErrDegradedMode
|
||||
}
|
||||
|
||||
cc, err := s.metaBase.ObjectCounters()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return cc.Logic(), nil
|
||||
}
|
|
@ -8,15 +8,18 @@ import (
|
|||
const serviceName = "control.ControlService"
|
||||
|
||||
const (
|
||||
rpcHealthCheck = "HealthCheck"
|
||||
rpcSetNetmapStatus = "SetNetmapStatus"
|
||||
rpcDropObjects = "DropObjects"
|
||||
rpcListShards = "ListShards"
|
||||
rpcSetShardMode = "SetShardMode"
|
||||
rpcSynchronizeTree = "SynchronizeTree"
|
||||
rpcEvacuateShard = "EvacuateShard"
|
||||
rpcFlushCache = "FlushCache"
|
||||
rpcDoctor = "Doctor"
|
||||
rpcHealthCheck = "HealthCheck"
|
||||
rpcSetNetmapStatus = "SetNetmapStatus"
|
||||
rpcDropObjects = "DropObjects"
|
||||
rpcListShards = "ListShards"
|
||||
rpcSetShardMode = "SetShardMode"
|
||||
rpcSynchronizeTree = "SynchronizeTree"
|
||||
rpcEvacuateShard = "EvacuateShard"
|
||||
rpcStartEvacuateShard = "StartEvacuateShard"
|
||||
rpcGetEvacuateShardStatus = "GetEvacuateShardStatus"
|
||||
rpcStopEvacuateShardStatus = "StopEvacuateShard"
|
||||
rpcFlushCache = "FlushCache"
|
||||
rpcDoctor = "Doctor"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client
|
|||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// StartEvacuateShard executes ControlService.StartEvacuateShard RPC.
|
||||
func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
|
||||
wResp := newResponseWrapper[StartShardEvacuationResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC.
|
||||
func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) {
|
||||
wResp := newResponseWrapper[GetShardEvacuationStatusResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// StopEvacuateShard executes ControlService.StopEvacuateShard RPC.
|
||||
func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) {
|
||||
wResp := newResponseWrapper[StopShardEvacuationResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// FlushCache executes ControlService.FlushCache RPC.
|
||||
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
|
||||
wResp := newResponseWrapper[FlushCacheResponse]()
|
||||
|
|
60
pkg/services/control/server/convert.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/mr-tron/base58"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) {
|
||||
shardIDs := make([][]byte, 0, len(state.ShardIDs()))
|
||||
for _, shID := range state.ShardIDs() {
|
||||
id, err := base58.Decode(shID)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID))
|
||||
}
|
||||
shardIDs = append(shardIDs, id)
|
||||
}
|
||||
var evacStatus control.GetShardEvacuationStatusResponse_Body_Status
|
||||
switch state.ProcessingStatus() {
|
||||
case engine.EvacuateProcessStateRunning:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING
|
||||
case engine.EvacuateProcessStateCompleted:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED
|
||||
default:
|
||||
evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED
|
||||
}
|
||||
var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp
|
||||
if state.StartedAt() != nil {
|
||||
startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{
|
||||
Value: state.StartedAt().Unix(),
|
||||
}
|
||||
}
|
||||
var duration *control.GetShardEvacuationStatusResponse_Body_Duration
|
||||
if state.StartedAt() != nil {
|
||||
acid-ant
commented
Is it possible to use one Is it possible to use one `if` statement here and above?
dstepanov-yadro
commented
It is possible. But first It is possible. But first `if` is for `startedAt` value, and second `if` for `duration` value. It's easier for me this way, i'm old.
|
||||
end := time.Now().UTC()
|
||||
if state.FinishedAt() != nil {
|
||||
end = *state.FinishedAt()
|
||||
}
|
||||
duration = &control.GetShardEvacuationStatusResponse_Body_Duration{
|
||||
Seconds: int64(end.Sub(*state.StartedAt()).Seconds()),
|
||||
}
|
||||
}
|
||||
return &control.GetShardEvacuationStatusResponse{
|
||||
Body: &control.GetShardEvacuationStatusResponse_Body{
|
||||
Shard_ID: shardIDs,
|
||||
Evacuated: state.Evacuated(),
|
||||
Total: state.Total(),
|
||||
Failed: state.Failed(),
|
||||
Status: evacStatus,
|
||||
StartedAt: startedAt,
|
||||
Duration: duration,
|
||||
ErrorMessage: state.ErrorMessage(),
|
||||
},
|
||||
}, nil
|
||||
}
|
|
@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
|
|||
|
||||
resp := &control.EvacuateShardResponse{
|
||||
Body: &control.EvacuateShardResponse_Body{
|
||||
Count: uint32(res.Count()),
|
||||
Count: uint32(res.Evacuated()),
|
||||
},
|
||||
}
|
||||
|
||||
|
|
97
pkg/services/control/server/evacuate_async.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
var prm engine.EvacuateShardPrm
|
||||
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
|
||||
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
|
||||
prm.WithFaultHandler(s.replicate)
|
||||
prm.WithAsync(true)
|
||||
aarifullin
commented
For me it's obvios that For me it's obvios that `WithAsync` sets `async` flag and there's no need to pass the boolean argument - just `WithAsync()` - WDYT?
fyrchik
commented
It is usually more convenient because:
It is usually more convenient because:
1. Easier to pass down the stream (`WithAsync(async)` instead of `if async { WithAsync }`.
2. Easier to override (e.g. we have defaults in tests, but don't want Async for one specific test).
|
||||
|
||||
_, err = s.s.Evacuate(ctx, prm)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.StartShardEvacuationResponse{
|
||||
Body: &control.StartShardEvacuationResponse_Body{},
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
state, err := s.s.GetEvacuationState(ctx)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp, err := stateToResponse(state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
err = s.s.EnqueRunningEvacuationStop(ctx)
|
||||
if err != nil {
|
||||
var logicalErr logicerr.Logical
|
||||
if errors.As(err, &logicalErr) {
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
}
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.StopShardEvacuationResponse{
|
||||
Body: &control.StopShardEvacuationResponse_Body{},
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return resp, nil
|
||||
}
|
1550
pkg/services/control/service.pb.go
generated
|
@ -27,8 +27,18 @@ service ControlService {
|
|||
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
|
||||
|
||||
// EvacuateShard moves all data from one shard to the others.
|
||||
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
|
||||
rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse);
|
||||
|
||||
// StartShardEvacuation starts moving all data from one shard to the others.
|
||||
rpc StartShardEvacuation (StartShardEvacuationRequest) returns (StartShardEvacuationResponse);
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why is it Why is it `evacuation` in CLI and `evacuate` here?
dstepanov-yadro
commented
fixed fixed
|
||||
|
||||
// GetShardEvacuationStatus returns evacuation status.
|
||||
rpc GetShardEvacuationStatus (GetShardEvacuationStatusRequest) returns (GetShardEvacuationStatusResponse);
|
||||
|
||||
// StopShardEvacuation stops moving all data from one shard to the others.
|
||||
rpc StopShardEvacuation (StopShardEvacuationRequest) returns (StopShardEvacuationResponse);
|
||||
|
||||
// FlushCache moves all data from one shard to the others.
|
||||
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
|
||||
|
||||
|
@ -298,3 +308,97 @@ message DoctorResponse {
|
|||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StartShardEvacuation request.
|
||||
message StartShardEvacuationRequest {
|
||||
// Request body structure.
|
||||
message Body {
|
||||
// IDs of the shards.
|
||||
repeated bytes shard_ID = 1;
|
||||
// Flag indicating whether object read errors should be ignored.
|
||||
bool ignore_errors = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StartShardEvacuation response.
|
||||
message StartShardEvacuationResponse {
|
||||
// Response body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// GetShardEvacuationStatus request.
|
||||
message GetShardEvacuationStatusRequest {
|
||||
// Request body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// GetShardEvacuationStatus response.
|
||||
message GetShardEvacuationStatusResponse {
|
||||
// Response body structure.
|
||||
message Body {
|
||||
// Evacuate status enum.
|
||||
enum Status {
|
||||
EVACUATE_SHARD_STATUS_UNDEFINED = 0;
|
||||
RUNNING = 1;
|
||||
COMPLETED = 2;
|
||||
}
|
||||
|
||||
// Unix timestamp value.
|
||||
message UnixTimestamp {
|
||||
int64 value = 1;
|
||||
}
|
||||
|
||||
// Duration in seconds.
|
||||
message Duration {
|
||||
int64 seconds = 1;
|
||||
}
|
||||
|
||||
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
|
||||
uint64 total = 1;
|
||||
// Evacuated objects count.
|
||||
uint64 evacuated = 2;
|
||||
// Failed objects count.
|
||||
uint64 failed = 3;
|
||||
|
||||
// Shard IDs.
|
||||
repeated bytes shard_ID = 4;
|
||||
// Evacuation process status.
|
||||
Status status = 5;
|
||||
// Evacuation process duration.
|
||||
Duration duration = 6;
|
||||
// Evacuation process started at timestamp.
|
||||
UnixTimestamp started_at = 7;
|
||||
// Error message if evacuation failed.
|
||||
string error_message = 8;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StopShardEvacuation request.
|
||||
message StopShardEvacuationRequest {
|
||||
// Request body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
// StopShardEvacuation response.
|
||||
message StopShardEvacuationResponse {
|
||||
// Response body structure.
|
||||
message Body {}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
516
pkg/services/control/service_frostfs.pb.go
generated
|
@ -1391,3 +1391,519 @@ func (x *DoctorResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
|||
func (x *DoctorResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StartShardEvacuationRequest_Body) StableSize() (size int) {
|
||||
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||
size += proto.BoolSize(2, x.IgnoreErrors)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StartShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StartShardEvacuationRequest) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StartShardEvacuationRequest) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *StartShardEvacuationRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *StartShardEvacuationRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *StartShardEvacuationRequest) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StartShardEvacuationResponse_Body) StableSize() (size int) {
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StartShardEvacuationResponse_Body) StableMarshal(buf []byte) []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StartShardEvacuationResponse) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StartShardEvacuationResponse) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *StartShardEvacuationResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *StartShardEvacuationResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *StartShardEvacuationResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusRequest_Body) StableSize() (size int) {
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusRequest_Body) StableMarshal(buf []byte) []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusRequest) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusRequest) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *GetShardEvacuationStatusRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *GetShardEvacuationStatusRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *GetShardEvacuationStatusRequest) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusResponse_Body_UnixTimestamp) StableSize() (size int) {
|
||||
size += proto.Int64Size(1, x.Value)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusResponse_Body_UnixTimestamp) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.Int64Marshal(1, buf[offset:], x.Value)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusResponse_Body_Duration) StableSize() (size int) {
|
||||
size += proto.Int64Size(1, x.Seconds)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusResponse_Body_Duration) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.Int64Marshal(1, buf[offset:], x.Seconds)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusResponse_Body) StableSize() (size int) {
|
||||
size += proto.UInt64Size(1, x.Total)
|
||||
size += proto.UInt64Size(2, x.Evacuated)
|
||||
size += proto.UInt64Size(3, x.Failed)
|
||||
size += proto.RepeatedBytesSize(4, x.Shard_ID)
|
||||
size += proto.EnumSize(5, int32(x.Status))
|
||||
size += proto.NestedStructureSize(6, x.Duration)
|
||||
size += proto.NestedStructureSize(7, x.StartedAt)
|
||||
size += proto.StringSize(8, x.ErrorMessage)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusResponse_Body) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.UInt64Marshal(1, buf[offset:], x.Total)
|
||||
offset += proto.UInt64Marshal(2, buf[offset:], x.Evacuated)
|
||||
offset += proto.UInt64Marshal(3, buf[offset:], x.Failed)
|
||||
offset += proto.RepeatedBytesMarshal(4, buf[offset:], x.Shard_ID)
|
||||
offset += proto.EnumMarshal(5, buf[offset:], int32(x.Status))
|
||||
offset += proto.NestedStructureMarshal(6, buf[offset:], x.Duration)
|
||||
offset += proto.NestedStructureMarshal(7, buf[offset:], x.StartedAt)
|
||||
offset += proto.StringMarshal(8, buf[offset:], x.ErrorMessage)
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *GetShardEvacuationStatusResponse) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *GetShardEvacuationStatusResponse) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *GetShardEvacuationStatusResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *GetShardEvacuationStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *GetShardEvacuationStatusResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StopShardEvacuationRequest_Body) StableSize() (size int) {
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StopShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StopShardEvacuationRequest) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StopShardEvacuationRequest) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *StopShardEvacuationRequest) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *StopShardEvacuationRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *StopShardEvacuationRequest) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StopShardEvacuationResponse_Body) StableSize() (size int) {
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StopShardEvacuationResponse_Body) StableMarshal(buf []byte) []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// StableSize returns the size of x in protobuf format.
|
||||
//
|
||||
// Structures with the same field values have the same binary size.
|
||||
func (x *StopShardEvacuationResponse) StableSize() (size int) {
|
||||
size += proto.NestedStructureSize(1, x.Body)
|
||||
size += proto.NestedStructureSize(2, x.Signature)
|
||||
return size
|
||||
}
|
||||
|
||||
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||
//
|
||||
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same binary format.
|
||||
func (x *StopShardEvacuationResponse) StableMarshal(buf []byte) []byte {
|
||||
if x == nil {
|
||||
return []byte{}
|
||||
}
|
||||
if buf == nil {
|
||||
buf = make([]byte, x.StableSize())
|
||||
}
|
||||
var offset int
|
||||
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||
return buf
|
||||
}
|
||||
|
||||
// ReadSignedData fills buf with signed data of x.
|
||||
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||
//
|
||||
// Returns any error encountered which did not allow writing the data completely.
|
||||
// Otherwise, returns the buffer in which the data is written.
|
||||
//
|
||||
// Structures with the same field values have the same signed data.
|
||||
func (x *StopShardEvacuationResponse) SignedDataSize() int {
|
||||
return x.GetBody().StableSize()
|
||||
}
|
||||
|
||||
// SignedDataSize returns size of the request signed data in bytes.
|
||||
//
|
||||
// Structures with the same field values have the same signed data size.
|
||||
func (x *StopShardEvacuationResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||
return x.GetBody().StableMarshal(buf), nil
|
||||
}
|
||||
|
||||
func (x *StopShardEvacuationResponse) SetSignature(sig *Signature) {
|
||||
x.Signature = sig
|
||||
}
|
||||
|
|
171
pkg/services/control/service_grpc.pb.go
generated
|
@ -1,7 +1,7 @@
|
|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.2.0
|
||||
// - protoc v3.12.4
|
||||
// - protoc-gen-go-grpc v1.3.0
|
||||
// - protoc v3.21.9
|
||||
// source: pkg/services/control/service.proto
|
||||
|
||||
package control
|
||||
|
@ -18,6 +18,21 @@ import (
|
|||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
const (
|
||||
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
|
||||
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
|
||||
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
|
||||
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
|
||||
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
|
||||
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
|
||||
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
|
||||
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
|
||||
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
|
||||
ControlService_StopShardEvacuation_FullMethodName = "/control.ControlService/StopShardEvacuation"
|
||||
ControlService_FlushCache_FullMethodName = "/control.ControlService/FlushCache"
|
||||
ControlService_Doctor_FullMethodName = "/control.ControlService/Doctor"
|
||||
)
|
||||
|
||||
// ControlServiceClient is the client API for ControlService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
|
@ -35,7 +50,14 @@ type ControlServiceClient interface {
|
|||
// Synchronizes all log operations for the specified tree.
|
||||
SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error)
|
||||
// EvacuateShard moves all data from one shard to the others.
|
||||
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
|
||||
EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error)
|
||||
// StartShardEvacuation starts moving all data from one shard to the others.
|
||||
StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error)
|
||||
// GetShardEvacuationStatus returns evacuation status.
|
||||
GetShardEvacuationStatus(ctx context.Context, in *GetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*GetShardEvacuationStatusResponse, error)
|
||||
// StopShardEvacuation stops moving all data from one shard to the others.
|
||||
StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error)
|
||||
// FlushCache moves all data from one shard to the others.
|
||||
FlushCache(ctx context.Context, in *FlushCacheRequest, opts ...grpc.CallOption) (*FlushCacheResponse, error)
|
||||
// Doctor performs storage restructuring operations on engine.
|
||||
|
@ -52,7 +74,7 @@ func NewControlServiceClient(cc grpc.ClientConnInterface) ControlServiceClient {
|
|||
|
||||
func (c *controlServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
|
||||
out := new(HealthCheckResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/HealthCheck", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_HealthCheck_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -61,7 +83,7 @@ func (c *controlServiceClient) HealthCheck(ctx context.Context, in *HealthCheckR
|
|||
|
||||
func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetmapStatusRequest, opts ...grpc.CallOption) (*SetNetmapStatusResponse, error) {
|
||||
out := new(SetNetmapStatusResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/SetNetmapStatus", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_SetNetmapStatus_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -70,7 +92,7 @@ func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetma
|
|||
|
||||
func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error) {
|
||||
out := new(DropObjectsResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/DropObjects", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_DropObjects_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -79,7 +101,7 @@ func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsR
|
|||
|
||||
func (c *controlServiceClient) ListShards(ctx context.Context, in *ListShardsRequest, opts ...grpc.CallOption) (*ListShardsResponse, error) {
|
||||
out := new(ListShardsResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/ListShards", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_ListShards_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -88,7 +110,7 @@ func (c *controlServiceClient) ListShards(ctx context.Context, in *ListShardsReq
|
|||
|
||||
func (c *controlServiceClient) SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error) {
|
||||
out := new(SetShardModeResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/SetShardMode", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_SetShardMode_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -97,7 +119,7 @@ func (c *controlServiceClient) SetShardMode(ctx context.Context, in *SetShardMod
|
|||
|
||||
func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error) {
|
||||
out := new(SynchronizeTreeResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/SynchronizeTree", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_SynchronizeTree_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -106,7 +128,34 @@ func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *Synchron
|
|||
|
||||
func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) {
|
||||
out := new(EvacuateShardResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/EvacuateShard", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_EvacuateShard_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) {
|
||||
out := new(StartShardEvacuationResponse)
|
||||
err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *controlServiceClient) GetShardEvacuationStatus(ctx context.Context, in *GetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*GetShardEvacuationStatusResponse, error) {
|
||||
out := new(GetShardEvacuationStatusResponse)
|
||||
err := c.cc.Invoke(ctx, ControlService_GetShardEvacuationStatus_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *controlServiceClient) StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error) {
|
||||
out := new(StopShardEvacuationResponse)
|
||||
err := c.cc.Invoke(ctx, ControlService_StopShardEvacuation_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -115,7 +164,7 @@ func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateSh
|
|||
|
||||
func (c *controlServiceClient) FlushCache(ctx context.Context, in *FlushCacheRequest, opts ...grpc.CallOption) (*FlushCacheResponse, error) {
|
||||
out := new(FlushCacheResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/FlushCache", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_FlushCache_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -124,7 +173,7 @@ func (c *controlServiceClient) FlushCache(ctx context.Context, in *FlushCacheReq
|
|||
|
||||
func (c *controlServiceClient) Doctor(ctx context.Context, in *DoctorRequest, opts ...grpc.CallOption) (*DoctorResponse, error) {
|
||||
out := new(DoctorResponse)
|
||||
err := c.cc.Invoke(ctx, "/control.ControlService/Doctor", in, out, opts...)
|
||||
err := c.cc.Invoke(ctx, ControlService_Doctor_FullMethodName, in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -148,7 +197,14 @@ type ControlServiceServer interface {
|
|||
// Synchronizes all log operations for the specified tree.
|
||||
SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error)
|
||||
// EvacuateShard moves all data from one shard to the others.
|
||||
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
|
||||
EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error)
|
||||
// StartShardEvacuation starts moving all data from one shard to the others.
|
||||
StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error)
|
||||
// GetShardEvacuationStatus returns evacuation status.
|
||||
GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error)
|
||||
// StopShardEvacuation stops moving all data from one shard to the others.
|
||||
StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error)
|
||||
// FlushCache moves all data from one shard to the others.
|
||||
FlushCache(context.Context, *FlushCacheRequest) (*FlushCacheResponse, error)
|
||||
// Doctor performs storage restructuring operations on engine.
|
||||
|
@ -180,6 +236,15 @@ func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *Synch
|
|||
func (UnimplementedControlServiceServer) EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method EvacuateShard not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetShardEvacuationStatus not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method StopShardEvacuation not implemented")
|
||||
}
|
||||
func (UnimplementedControlServiceServer) FlushCache(context.Context, *FlushCacheRequest) (*FlushCacheResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method FlushCache not implemented")
|
||||
}
|
||||
|
@ -208,7 +273,7 @@ func _ControlService_HealthCheck_Handler(srv interface{}, ctx context.Context, d
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/HealthCheck",
|
||||
FullMethod: ControlService_HealthCheck_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest))
|
||||
|
@ -226,7 +291,7 @@ func _ControlService_SetNetmapStatus_Handler(srv interface{}, ctx context.Contex
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/SetNetmapStatus",
|
||||
FullMethod: ControlService_SetNetmapStatus_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).SetNetmapStatus(ctx, req.(*SetNetmapStatusRequest))
|
||||
|
@ -244,7 +309,7 @@ func _ControlService_DropObjects_Handler(srv interface{}, ctx context.Context, d
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/DropObjects",
|
||||
FullMethod: ControlService_DropObjects_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).DropObjects(ctx, req.(*DropObjectsRequest))
|
||||
|
@ -262,7 +327,7 @@ func _ControlService_ListShards_Handler(srv interface{}, ctx context.Context, de
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/ListShards",
|
||||
FullMethod: ControlService_ListShards_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).ListShards(ctx, req.(*ListShardsRequest))
|
||||
|
@ -280,7 +345,7 @@ func _ControlService_SetShardMode_Handler(srv interface{}, ctx context.Context,
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/SetShardMode",
|
||||
FullMethod: ControlService_SetShardMode_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).SetShardMode(ctx, req.(*SetShardModeRequest))
|
||||
|
@ -298,7 +363,7 @@ func _ControlService_SynchronizeTree_Handler(srv interface{}, ctx context.Contex
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/SynchronizeTree",
|
||||
FullMethod: ControlService_SynchronizeTree_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).SynchronizeTree(ctx, req.(*SynchronizeTreeRequest))
|
||||
|
@ -316,7 +381,7 @@ func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context,
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/EvacuateShard",
|
||||
FullMethod: ControlService_EvacuateShard_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).EvacuateShard(ctx, req.(*EvacuateShardRequest))
|
||||
|
@ -324,6 +389,60 @@ func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context,
|
|||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(StartShardEvacuationRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ControlServiceServer).StartShardEvacuation(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ControlService_StartShardEvacuation_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).StartShardEvacuation(ctx, req.(*StartShardEvacuationRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_GetShardEvacuationStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(GetShardEvacuationStatusRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ControlServiceServer).GetShardEvacuationStatus(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ControlService_GetShardEvacuationStatus_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).GetShardEvacuationStatus(ctx, req.(*GetShardEvacuationStatusRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_StopShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(StopShardEvacuationRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ControlServiceServer).StopShardEvacuation(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: ControlService_StopShardEvacuation_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).StopShardEvacuation(ctx, req.(*StopShardEvacuationRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ControlService_FlushCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(FlushCacheRequest)
|
||||
if err := dec(in); err != nil {
|
||||
|
@ -334,7 +453,7 @@ func _ControlService_FlushCache_Handler(srv interface{}, ctx context.Context, de
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/FlushCache",
|
||||
FullMethod: ControlService_FlushCache_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).FlushCache(ctx, req.(*FlushCacheRequest))
|
||||
|
@ -352,7 +471,7 @@ func _ControlService_Doctor_Handler(srv interface{}, ctx context.Context, dec fu
|
|||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/control.ControlService/Doctor",
|
||||
FullMethod: ControlService_Doctor_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ControlServiceServer).Doctor(ctx, req.(*DoctorRequest))
|
||||
|
@ -395,6 +514,18 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
|||
MethodName: "EvacuateShard",
|
||||
Handler: _ControlService_EvacuateShard_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "StartShardEvacuation",
|
||||
Handler: _ControlService_StartShardEvacuation_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "GetShardEvacuationStatus",
|
||||
Handler: _ControlService_GetShardEvacuationStatus_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "StopShardEvacuation",
|
||||
Handler: _ControlService_StopShardEvacuation_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "FlushCache",
|
||||
Handler: _ControlService_FlushCache_Handler,
|
||||
|
|
2
pkg/services/control/types.pb.go
generated
|
@ -1,7 +1,7 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.26.0
|
||||
// protoc v3.12.4
|
||||
// protoc v3.21.9
|
||||
// source: pkg/services/control/types.proto
|
||||
|
||||
package control
|
||||
|
|
pooling or polling?
polling. fixed.