Async evacuate #329

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/async-evacuate into master 2023-05-19 08:43:54 +00:00
19 changed files with 3237 additions and 275 deletions

View file

@ -11,10 +11,11 @@ import (
const ignoreErrorsFlag = "no-errors"
var evacuateShardCmd = &cobra.Command{
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Use: "evacuate",
Short: "Evacuate objects from shard",
Long: "Evacuate objects from shard to other shards",
Run: evacuateShard,
Deprecated: "use frostfs-cli control shards evacuation start",
}
func evacuateShard(cmd *cobra.Command, _ []string) {

View file

@ -0,0 +1,296 @@
package control
import (
"crypto/ecdsa"
"fmt"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
clientSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/spf13/cobra"
"go.uber.org/atomic"
)
const (
awaitFlag = "await"
noProgressFlag = "no-progress"
)
var evacuationShardCmd = &cobra.Command{
Use: "evacuation",
Short: "Objects evacuation from shard",
Long: "Objects evacuation from shard to other shards",
}
var startEvacuationShardCmd = &cobra.Command{
Use: "start",
Short: "Start evacuate objects from shard",
Long: "Start evacuate objects from shard to other shards",
Run: startEvacuateShard,
}
var getEvacuationShardStatusCmd = &cobra.Command{
Use: "status",
Short: "Get evacuate objects from shard status",
Long: "Get evacuate objects from shard to other shards status",
Run: getEvacuateShardStatus,
}
var stopEvacuationShardCmd = &cobra.Command{
Use: "stop",
Short: "Stop running evacuate process",
Long: "Stop running evacuate process from shard to other shards",
Run: stopEvacuateShardStatus,
}
func startEvacuateShard(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
req := &control.StartShardEvacuationRequest{
Body: &control.StartShardEvacuationRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.StartShardEvacuationResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.StartEvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Start evacuate shards failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
cmd.Println("Shard evacuation has been successfully started.")
if awaitCompletion, _ := cmd.Flags().GetBool(awaitFlag); awaitCompletion {
noProgress, _ := cmd.Flags().GetBool(noProgressFlag)
waitEvacuateCompletion(cmd, pk, cli, !noProgress, true)
}
}
func getEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.GetShardEvacuationStatusRequest{
Body: &control.GetShardEvacuationStatusRequest_Body{},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.GetShardEvacuationStatusResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.GetEvacuateShardStatus(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Get evacuate shards status failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
printStatus(cmd, resp)
}
func stopEvacuateShardStatus(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.StopShardEvacuationRequest{
Body: &control.StopShardEvacuationRequest_Body{},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.StopShardEvacuationResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.StopEvacuateShard(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "Stop evacuate shards failed, rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
waitEvacuateCompletion(cmd, pk, cli, false, false)
cmd.Println("Evacuation stopped.")
}
func waitEvacuateCompletion(cmd *cobra.Command, pk *ecdsa.PrivateKey, cli *clientSDK.Client, printProgress, printCompleted bool) {
const statusPollingInterval = 1 * time.Second
const reportIntervalSeconds = 5
var resp *control.GetShardEvacuationStatusResponse
reportResponse := atomic.NewPointer(resp)
pollingCompleted := make(chan struct{})
fyrchik marked this conversation as resolved Outdated

pooling or polling?

pooling or polling?

polling. fixed.

polling. fixed.

Is it possible to use any here and below?

Is it possible to use `any` here and below?

fixed to struct{}

fixed to struct{}
progressReportCompleted := make(chan struct{})
fyrchik marked this conversation as resolved Outdated

Also, why interface{} and not struct{}? It seems we only close it.

Also, why `interface{}` and not `struct{}`? It seems we only close it.

Ok, struct{}. Fixed.

Ok, struct{}. Fixed.
go func() {
fyrchik marked this conversation as resolved Outdated

Why do we need a goroutine here? It seems we already sleep in the main loop.

Why do we need a goroutine here? It seems we already sleep in the main loop.

Goroutine prints report every N seconds. Main loop makes request and sleeps.

Goroutine prints report every N seconds. Main loop makes request and sleeps.
defer close(progressReportCompleted)
if !printProgress {
return
}
cmd.Printf("Progress will be reported every %d seconds.\n", reportIntervalSeconds)
for {
select {
case <-pollingCompleted:
return
case <-time.After(reportIntervalSeconds * time.Second):
r := reportResponse.Load()
if r == nil || r.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_COMPLETED {
continue
}
printStatus(cmd, r)
}
}
}()
for {
req := &control.GetShardEvacuationStatusRequest{
Body: &control.GetShardEvacuationStatusRequest_Body{},
}
signRequest(cmd, pk, req)
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.GetEvacuateShardStatus(client, req)
return err
})
reportResponse.Store(resp)
if err != nil {
commonCmd.ExitOnErr(cmd, "Failed to get evacuate status, rpc error: %w", err)
return
}
if resp.GetBody().GetStatus() != control.GetShardEvacuationStatusResponse_Body_RUNNING {
break
}
time.Sleep(statusPollingInterval)
}
close(pollingCompleted)
<-progressReportCompleted
if printCompleted {
printCompletedStatusMessage(cmd, resp)
}
}
func printCompletedStatusMessage(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
cmd.Println("Shard evacuation has been completed.")
sb := &strings.Builder{}
appendShardIDs(sb, resp)
appendCounts(sb, resp)
appendError(sb, resp)
appendStartedAt(sb, resp)
appendDuration(sb, resp)
cmd.Println(sb.String())
}
func printStatus(cmd *cobra.Command, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStatus() == control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED {
cmd.Println("There is no running or completed evacuation.")
return
}
sb := &strings.Builder{}
appendShardIDs(sb, resp)
appendStatus(sb, resp)
appendCounts(sb, resp)
appendError(sb, resp)
appendStartedAt(sb, resp)
appendDuration(sb, resp)
cmd.Println(sb.String())
}
func appendDuration(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetDuration() != nil {
duration := time.Second * time.Duration(resp.GetBody().GetDuration().GetSeconds())
hour := int(duration.Seconds() / 3600)
minute := int(duration.Seconds()/60) % 60
second := int(duration.Seconds()) % 60
sb.WriteString(fmt.Sprintf(" Duration: %02d:%02d:%02d.", hour, minute, second))
}
}
func appendStartedAt(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if resp.GetBody().GetStartedAt() != nil {
startedAt := time.Unix(resp.GetBody().GetStartedAt().GetValue(), 0).UTC()
sb.WriteString(fmt.Sprintf(" Started at: %s UTC.", startedAt.Format(time.RFC3339)))
}
}
func appendError(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
if len(resp.Body.GetErrorMessage()) > 0 {
sb.WriteString(fmt.Sprintf(" Error: %s.", resp.Body.GetErrorMessage()))
}
}
func appendStatus(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
var status string
switch resp.GetBody().GetStatus() {
case control.GetShardEvacuationStatusResponse_Body_COMPLETED:
status = "completed"
case control.GetShardEvacuationStatusResponse_Body_RUNNING:
status = "running"
default:
status = "undefined"
}
sb.WriteString(fmt.Sprintf(" Status: %s.", status))
}
func appendShardIDs(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString("Shard IDs: ")
for idx, shardID := range resp.GetBody().GetShard_ID() {
shardIDStr := shard.NewIDFromBytes(shardID).String()
if idx > 0 {
sb.WriteString(", ")
}
sb.WriteString(shardIDStr)
if idx == len(resp.GetBody().GetShard_ID())-1 {
sb.WriteString(".")
}
}
}
func appendCounts(sb *strings.Builder, resp *control.GetShardEvacuationStatusResponse) {
sb.WriteString(fmt.Sprintf(" Evacuated %d object out of %d, failed to evacuate %d objects.",
resp.GetBody().GetEvacuated(),
resp.Body.GetTotal(),
resp.Body.GetFailed()))
}
func initControlEvacuationShardCmd() {
evacuationShardCmd.AddCommand(startEvacuationShardCmd)
evacuationShardCmd.AddCommand(getEvacuationShardStatusCmd)
evacuationShardCmd.AddCommand(stopEvacuationShardCmd)
initControlStartEvacuationShardCmd()
initControlFlags(getEvacuationShardStatusCmd)
initControlFlags(stopEvacuationShardCmd)
}
func initControlStartEvacuationShardCmd() {
initControlFlags(startEvacuationShardCmd)
flags := startEvacuationShardCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -13,13 +13,14 @@ var shardsCmd = &cobra.Command{
func initControlShardsCmd() {
shardsCmd.AddCommand(listShardsCmd)
shardsCmd.AddCommand(setShardModeCmd)
shardsCmd.AddCommand(evacuateShardCmd)
shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd)
initControlShardsListCmd()
initControlSetShardModeCmd()
initControlEvacuateShardCmd()
initControlEvacuationShardCmd()
initControlFlushCacheCmd()
initControlDoctorCmd()
}

92
docs/evacuation.md Normal file
View file

@ -0,0 +1,92 @@
# Shard data evacuation
## Overview
Evacuation is the process of transferring data from one shard to another. Evacuation is used in case of problems with the shard in order to save data.
To start the evacuation, it is necessary that the shard is in read-only mode (read more [here](./shard-modes.md)).
First of all, by the evacuation the data is transferred to other shards of the same node; if it is not possible, then the data is transferred to other nodes.

Is it true, that if the one wants to migrate all the data out of a storage node, she needs to add all shards with a shardAllFlag toggled? If so, can we emphasize it in documentation?

Is it true, that if the one wants to migrate all the data out of a storage node, she needs to add all shards with a `shardAllFlag` toggled? If so, can we emphasize it in documentation?

Added to Commands section.

Added to `Commands` section.
Only one running evacuation process is allowed on the node at a time.
`frostfs-cli` utility is used to manage evacuation.
## Commands
`frostfs-cli control shards evacuation start` starts evacuation process for shards specified. To start evacuating all node shards, use the `--all` flag.
`frostfs-cli control shards evacuation stop` stops running evacuation process.
`frostfs-cli control shards evacuation status` prints evacuation process status.
See commands `--help` output for detailed description.
## Examples
### Set shard mode to read only
```bash
frostfs-cli control shards set-mode --mode read-only --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
Enter password >
Shard mode update request successfully sent.
```
### Start evacuation and get status
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 8kEBwtvKLU3Hva3PaaodUi
Enter password >
Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 14 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:03.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: running. Evacuated 23 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:05.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 8kEBwtvKLU3Hva3PaaodUi. Status: completed. Evacuated 61 object out of 61, failed to evacuate 0 objects. Started at: 2023-05-10T10:13:06Z UTC. Duration: 00:00:13.
```
### Stop running evacuation process
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3
Enter password >
Shard evacuation has been successfully started.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 15 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:03.
frostfs-cli control shards evacuation stop --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Evacuation stopped.
frostfs-cli control shards evacuation status --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json
Enter password >
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: completed. Evacuated 31 object out of 73, failed to evacuate 0 objects. Error: context canceled. Started at: 2023-05-10T10:15:47Z UTC. Duration: 00:00:07.
```
### Start evacuation and await it completes
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await
Enter password >
Shard evacuation has been successfully started.
Progress will be reported every 5 seconds.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 18 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:04.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 43 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:09.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Status: running. Evacuated 68 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:18:42Z UTC. Duration: 00:00:14.
```
### Start evacuation and await it completes without progress notifications
```bash
frostfs-cli control shards evacuation start --endpoint s01.frostfs.devenv:8081 --wallet ./../frostfs-dev-env/services/storage/wallet01.json --id 54Y8aot9uc7BSadw2XtYr3 --await --no-progress
Enter password >
Shard evacuation has been successfully started.
Shard evacuation has been completed.
Shard IDs: 54Y8aot9uc7BSadw2XtYr3. Evacuated 73 object out of 73, failed to evacuate 0 objects. Started at: 2023-05-10T10:20:00Z UTC. Duration: 00:00:14.
```

View file

@ -484,5 +484,9 @@ const (
ShardGCCollectingExpiredLocksCompleted = "collecting expired locks completed"
ShardGCRemoveGarbageStarted = "garbage remove started"
ShardGCRemoveGarbageCompleted = "garbage remove completed"
EngineShardsEvacuationFailedToCount = "failed to get total objects count to evacuate"
EngineShardsEvacuationFailedToListObjects = "failed to list objects to evacuate"
EngineShardsEvacuationFailedToReadObject = "failed to read object to evacuate"
EngineShardsEvacuationFailedToMoveObject = "failed to evacuate object to other node"
ShardGCFailedToGetExpiredWithLinked = "failed to get expired objects with linked"
)

View file

@ -35,6 +35,7 @@ type StorageEngine struct {
err error
}
evacuateLimiter *evacuationLimiter
}
type shardWrapper struct {
@ -230,6 +231,9 @@ func New(opts ...Option) *StorageEngine {
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{
guard: &sync.RWMutex{},
},
}
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -14,6 +15,9 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -24,11 +28,23 @@ type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(context.Context, oid.Address, *objectSDK.Object) error
ignoreErrors bool
async bool
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
evacuated *atomic.Uint64
total *atomic.Uint64
failed *atomic.Uint64
}
// NewEvacuateShardRes creates new EvacuateShardRes instance.
func NewEvacuateShardRes() *EvacuateShardRes {
return &EvacuateShardRes{
evacuated: atomic.NewUint64(0),
total: atomic.NewUint64(0),
failed: atomic.NewUint64(0),
}
}
// WithShardIDList sets shard ID.
@ -46,10 +62,46 @@ func (p *EvacuateShardPrm) WithFaultHandler(f func(context.Context, oid.Address,
p.handler = f
}
// Count returns amount of evacuated objects.
// WithAsync sets flag to run evacuate async.
func (p *EvacuateShardPrm) WithAsync(async bool) {
p.async = async
}
// Evacuated returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
func (p *EvacuateShardRes) Evacuated() uint64 {
if p == nil {
return 0
}
return p.evacuated.Load()
}
// Total returns total count objects to evacuate.
func (p *EvacuateShardRes) Total() uint64 {
if p == nil {
return 0
}
return p.total.Load()
}
// Failed returns count of failed objects to evacuate.
func (p *EvacuateShardRes) Failed() uint64 {
if p == nil {
return 0
}
return p.failed.Load()
}
// DeepCopy returns deep copy of result instance.
func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
if p == nil {
return nil
}
return &EvacuateShardRes{
evacuated: atomic.NewUint64(p.evacuated.Load()),
total: atomic.NewUint64(p.total.Load()),
failed: atomic.NewUint64(p.failed.Load()),
}
}
const defaultEvacuateBatchSize = 100
@ -63,15 +115,29 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (EvacuateShardRes, error) {
func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*EvacuateShardRes, error) {
select {

Could you explain, please, why have you put these select-s at the beginning of the methods?
For me this select looks inconvinient. This check should be performed by the code that runs something asynchroniously (like the errgroup below)

Could you explain, please, why have you put these `select`-s at the beginning of the methods? For me this `select` looks inconvinient. This check should be performed by the code that runs something asynchroniously (like the errgroup below)

RPC call has deadline, so <-ctx.Done() can be true.

RPC call has deadline, so `<-ctx.Done()` can be true.

can be true

No doubt.

But why don't we rely on the context cancellation on errgroup.Go invocation level?

> can be true No doubt. But why don't we rely on the context cancellation on `errgroup.Go` invocation level?

errGroup can use detached context (context.Background()) in case of async execution, so this check will be the only one for ctx.Done

errGroup can use detached context (context.Background()) in case of async execution, so this check will be the only one for ctx.Done
case <-ctx.Done():
return nil, ctx.Err()
default:
}
shardIDs := make([]string, len(prm.shardID))
for i := range prm.shardID {
shardIDs[i] = prm.shardID[i].String()
}
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Evacuate",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer span.End()
shards, weights, err := e.getActualShards(shardIDs, prm.handler != nil)
if err != nil {
return EvacuateShardRes{}, err
return nil, err
}
shardsToEvacuate := make(map[string]*shard.Shard)
@ -83,23 +149,91 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (Eva
}
}
res := NewEvacuateShardRes()
ctx = ctxOrBackground(ctx, prm.async)
eg, egCtx, err := e.evacuateLimiter.TryStart(ctx, shardIDs, res)
if err != nil {
return nil, err
}
eg.Go(func() error {
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, weights, shardsToEvacuate)
})
if prm.async {
return nil, nil
}
return res, eg.Wait()
}

Is it possible to squash it in one line?

Is it possible to squash it in one line?

fixed.

fixed.
func ctxOrBackground(ctx context.Context, background bool) context.Context {
if background {
return context.Background()
}
return ctx
}
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
var err error
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
trace.WithAttributes(
attribute.StringSlice("shardIDs", shardIDs),
attribute.Bool("async", prm.async),
attribute.Bool("ignoreErrors", prm.ignoreErrors),
))
defer func() {
span.End()
e.evacuateLimiter.Complete(err)
}()
e.log.Info(logs.EngineStartedShardsEvacuation, zap.Strings("shard_ids", shardIDs))
var res EvacuateShardRes
err = e.getTotalObjectsCount(ctx, shardsToEvacuate, res)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToCount, zap.Strings("shard_ids", shardIDs), zap.Error(err))
return err
}
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, &res, shards, weights, shardsToEvacuate); err != nil {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, weights, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs))
return res, err
return err
}
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, zap.Strings("shard_ids", shardIDs))
return res, nil
return nil
}
func (e *StorageEngine) getTotalObjectsCount(ctx context.Context, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.getTotalObjectsCount")
defer span.End()
for _, sh := range shardsToEvacuate {
cnt, err := sh.LogicalObjectsCount(ctx)
if err != nil {
if errors.Is(err, shard.ErrDegradedMode) {
continue
}
return err
}
res.total.Add(cnt)
}
return nil
}
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
trace.WithAttributes(
attribute.String("shardID", shardID),
))
defer span.End()
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
@ -116,6 +250,7 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err))
return err
}
@ -168,6 +303,12 @@ func (e *StorageEngine) getActualShards(shardIDs []string, handlerDefined bool)
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.AddressWithType, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
trace.WithAttributes(
attribute.Int("objects_count", len(toEvacuate)),
))
defer span.End()
for i := range toEvacuate {
select {
case <-ctx.Done():
@ -182,12 +323,14 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
getRes, err := sh.Get(ctx, getPrm)
if err != nil {
if prm.ignoreErrors {
res.failed.Inc()
continue
}
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, res, shards, weights, shardsToEvacuate)
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, weights, shardsToEvacuate, res)
if err != nil {
return err
}
@ -204,15 +347,16 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
err = prm.handler(ctx, addr, getRes.Object())
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err))
return err
}
res.count++
res.evacuated.Inc()
}
return nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, res *EvacuateShardRes,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard) (bool, error) {
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, weights []float64, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes) (bool, error) {
hrw.SortHasherSliceByWeightValue(shards, weights, hrw.Hash([]byte(addr.EncodeToString())))
for j := range shards {
select {
@ -227,11 +371,11 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
putDone, exists := e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object)
if putDone || exists {
if putDone {
res.evacuated.Inc()
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
zap.Stringer("from", sh.ID()),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
return true, nil
}
@ -239,3 +383,23 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
return false, nil
}
func (e *StorageEngine) GetEvacuationState(ctx context.Context) (*EvacuationState, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return e.evacuateLimiter.GetState(), nil
}
func (e *StorageEngine) EnqueRunningEvacuationStop(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return e.evacuateLimiter.CancelIfRunning()
}

View file

@ -0,0 +1,178 @@
package engine
import (
"context"
"fmt"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"golang.org/x/sync/errgroup"
)
type EvacuateProcessState int
const (
EvacuateProcessStateUndefined EvacuateProcessState = iota
EvacuateProcessStateRunning
EvacuateProcessStateCompleted
)
type EvacuationState struct {
shardIDs []string
processState EvacuateProcessState
startedAt time.Time
finishedAt time.Time
result *EvacuateShardRes
errMessage string
}
func (s *EvacuationState) ShardIDs() []string {
if s == nil {
return nil
}
return s.shardIDs
}
func (s *EvacuationState) Evacuated() uint64 {
if s == nil {
return 0
}
return s.result.Evacuated()
}
func (s *EvacuationState) Total() uint64 {
if s == nil {
return 0
}
return s.result.Total()
}
func (s *EvacuationState) Failed() uint64 {
if s == nil {
return 0
}
return s.result.Failed()
}
func (s *EvacuationState) ProcessingStatus() EvacuateProcessState {
if s == nil {
return EvacuateProcessStateUndefined
}
return s.processState
}
func (s *EvacuationState) StartedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.startedAt == defaultTime {
return nil
}
return &s.startedAt
}
func (s *EvacuationState) FinishedAt() *time.Time {
if s == nil {
return nil
}
defaultTime := time.Time{}
if s.finishedAt == defaultTime {
return nil
}
return &s.finishedAt
}
func (s *EvacuationState) ErrorMessage() string {
if s == nil {
return ""
}
return s.errMessage
}
func (s *EvacuationState) DeepCopy() *EvacuationState {
if s == nil {
return nil
}
shardIDs := make([]string, len(s.shardIDs))
fyrchik marked this conversation as resolved Outdated

Do we need to specify client polling interval here in the engine?

Do we need to specify _client_ polling interval here in the engine?

To reduce the load, the server can increase this interval. But now it's constant.
Inspired by OAuth2 device code flow: https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-device-code#device-authorization-response

To reduce the load, the server can increase this interval. But now it's constant. Inspired by OAuth2 device code flow: https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-device-code#device-authorization-response

Ok, dropped

Ok, dropped
copy(shardIDs, s.shardIDs)
return &EvacuationState{
shardIDs: shardIDs,
processState: s.processState,
startedAt: s.startedAt,
finishedAt: s.finishedAt,
errMessage: s.errMessage,
fyrchik marked this conversation as resolved Outdated

Isn't it read-only?

Isn't it read-only?

It is. But since the method is called DeepCopy, then the copy must be deep.

It is. But since the method is called DeepCopy, then the copy must be deep.
result: s.result.DeepCopy(),
}
}
type evacuationLimiter struct {
state EvacuationState
eg *errgroup.Group
cancel context.CancelFunc
guard *sync.RWMutex
}
func (l *evacuationLimiter) TryStart(ctx context.Context, shardIDs []string, result *EvacuateShardRes) (*errgroup.Group, context.Context, error) {
l.guard.Lock()
defer l.guard.Unlock()
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if l.state.processState == EvacuateProcessStateRunning {
return nil, nil, logicerr.New(fmt.Sprintf("evacuate is already running for shard ids %v", l.state.shardIDs))
}
var egCtx context.Context
egCtx, l.cancel = context.WithCancel(ctx)
l.eg, egCtx = errgroup.WithContext(egCtx)
l.state = EvacuationState{
shardIDs: shardIDs,
processState: EvacuateProcessStateRunning,
startedAt: time.Now().UTC(),
result: result,
}
return l.eg, egCtx, nil
}
func (l *evacuationLimiter) Complete(err error) {
l.guard.Lock()
defer l.guard.Unlock()
errMsq := ""
if err != nil {
errMsq = err.Error()
}
l.state.processState = EvacuateProcessStateCompleted
l.state.errMessage = errMsq
l.state.finishedAt = time.Now().UTC()
l.eg = nil
}
func (l *evacuationLimiter) GetState() *EvacuationState {
l.guard.RLock()
defer l.guard.RUnlock()
return l.state.DeepCopy()
}
func (l *evacuationLimiter) CancelIfRunning() error {
l.guard.Lock()
defer l.guard.Unlock()
if l.state.processState != EvacuateProcessStateRunning {
return logicerr.New("there is no running evacuation task")

Why do we need deepcopy here? Pointers to atomics are ok to copy.

Why do we need deepcopy here? Pointers to atomics are ok to copy.

There were two ways to ensure consistency: mutex inside the structure or deep copy. I chose the second one. This is my engineering decision.

There were two ways to ensure consistency: mutex inside the structure or deep copy. I chose the second one. This is my engineering decision.

Could you elaborate, what consistency issues do we have if we do not do a deep copy?

Could you elaborate, what consistency issues do we have if we do not do a deep copy?
func (l *evacuationLimiter) Complete(err error) {
	l.guard.Lock()
	defer l.guard.Unlock()

	errMsq := ""
	if err != nil {
		errMsq = err.Error()
	}
	l.state.processState = EvacuateProcessStateCompleted
	l.state.errMessage = errMsq
	l.state.finishedAt = time.Now().UTC()

	l.eg = nil
}

func (l *evacuationLimiter) GetState() *EvacuationState {
	l.guard.RLock()
	defer l.guard.RUnlock()

	return l.state.DeepCopy()
}

state can change by evacuation goroutine, so we can get completed state without error for example.

``` func (l *evacuationLimiter) Complete(err error) { l.guard.Lock() defer l.guard.Unlock() errMsq := "" if err != nil { errMsq = err.Error() } l.state.processState = EvacuateProcessStateCompleted l.state.errMessage = errMsq l.state.finishedAt = time.Now().UTC() l.eg = nil } func (l *evacuationLimiter) GetState() *EvacuationState { l.guard.RLock() defer l.guard.RUnlock() return l.state.DeepCopy() } ``` `state` can change by evacuation goroutine, so we can get completed state without error for example.
}
l.cancel()
return nil
}

View file

@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"testing"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -21,6 +22,7 @@ import (
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"
)
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
@ -103,14 +105,14 @@ func TestEvacuateShard(t *testing.T) {
t.Run("must be read-only", func(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, ErrMustBeReadOnly)
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
})
require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly))
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, objPerShard, res.count)
require.Equal(t, uint64(objPerShard), res.Evacuated())
// We check that all objects are available both before and after shard removal.
// First case is a real-world use-case. It ensures that an object can be put in presense
@ -121,7 +123,7 @@ func TestEvacuateShard(t *testing.T) {
// Calling it again is OK, but all objects are already moved, so no new PUTs should be done.
res, err = e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 0, res.count)
require.Equal(t, uint64(0), res.Evacuated())
checkHasObjects(t)
@ -138,8 +140,8 @@ func TestEvacuateNetwork(t *testing.T) {
var errReplication = errors.New("handler error")
acceptOneOf := func(objects []*objectSDK.Object, max int) func(context.Context, oid.Address, *objectSDK.Object) error {
var n int
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) error {
var n uint64
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) error {
if n == max {
return errReplication
@ -169,13 +171,13 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errMustHaveTwoShards)
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
prm.handler = acceptOneOf(objects, 2)
res, err = e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
require.Equal(t, uint64(2), res.Evacuated())
})
t.Run("multiple shards, evacuate one", func(t *testing.T) {
t.Parallel()
@ -190,14 +192,14 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, 2, res.Count())
require.Equal(t, uint64(2), res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, 3)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, 3, res.Count())
require.Equal(t, uint64(3), res.Evacuated())
})
})
t.Run("multiple shards, evacuate many", func(t *testing.T) {
@ -205,12 +207,12 @@ func TestEvacuateNetwork(t *testing.T) {
e, ids, objects := newEngineEvacuate(t, 4, 5)
evacuateIDs := ids[0:3]
var totalCount int
var totalCount uint64
for i := range evacuateIDs {
res, err := e.shards[ids[i].String()].List()
require.NoError(t, err)
totalCount += len(res.AddressList())
totalCount += uint64(len(res.AddressList()))
}
for i := range ids {
@ -223,14 +225,14 @@ func TestEvacuateNetwork(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm)
require.ErrorIs(t, err, errReplication)
require.Equal(t, totalCount-1, res.Count())
require.Equal(t, totalCount-1, res.Evacuated())
t.Run("no errors", func(t *testing.T) {
prm.handler = acceptOneOf(objects, totalCount)
res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err)
require.Equal(t, totalCount, res.Count())
require.Equal(t, totalCount, res.Evacuated())
})
})
}
@ -258,5 +260,114 @@ func TestEvacuateCancellation(t *testing.T) {
res, err := e.Evacuate(ctx, prm)
require.ErrorContains(t, err, "context canceled")
require.Equal(t, 0, res.Count())
require.Equal(t, uint64(0), res.Evacuated())
}
func TestEvacuateSingleProcess(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
close(running)
}
<-blocker
return nil
}
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
eg.Go(func() error {
<-running
res, err := e.Evacuate(egCtx, prm)
require.ErrorContains(t, err, "evacuate is already running for shard ids", "second evacuation not failed")
require.Equal(t, uint64(0), res.Evacuated())
close(blocker)
return nil
})
require.NoError(t, eg.Wait())
}
func TestEvacuateAsync(t *testing.T) {
e, ids, _ := newEngineEvacuate(t, 2, 3)
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
blocker := make(chan interface{})
running := make(chan interface{})
var prm EvacuateShardPrm
prm.shardID = ids[1:2]
prm.handler = func(ctx context.Context, a oid.Address, o *objectSDK.Object) error {
select {
case <-running:
default:
fyrchik marked this conversation as resolved Outdated

We set it only one time, what about closing the channel instead?

We set it only one time, what about closing the channel instead?

fixed

fixed
close(running)
}
<-blocker
return nil
}
st, err := e.GetEvacuationState(context.Background())
require.NoError(t, err, "get init state failed")
require.Equal(t, EvacuateProcessStateUndefined, st.ProcessingStatus(), "invalid init state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid init count")
require.Nil(t, st.StartedAt(), "invalid init started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
require.ElementsMatch(t, []string{}, st.ShardIDs(), "invalid init shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
eg, egCtx := errgroup.WithContext(context.Background())
eg.Go(func() error {
res, err := e.Evacuate(egCtx, prm)
require.NoError(t, err, "first evacuation failed")
require.Equal(t, uint64(3), res.Evacuated())
return nil
})
<-running
st, err = e.GetEvacuationState(context.Background())
require.NoError(t, err, "get running state failed")
require.Equal(t, EvacuateProcessStateRunning, st.ProcessingStatus(), "invalid running state")
require.Equal(t, uint64(0), st.Evacuated(), "invalid running count")
require.NotNil(t, st.StartedAt(), "invalid running started at")
require.Nil(t, st.FinishedAt(), "invalid init finished at")
expectedShardIDs := make([]string, 0, 2)
for _, id := range ids[1:2] {
expectedShardIDs = append(expectedShardIDs, id.String())
}
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid running shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid init error message")
close(blocker)
require.Eventually(t, func() bool {
st, err = e.GetEvacuationState(context.Background())
return st.ProcessingStatus() == EvacuateProcessStateCompleted
}, 3*time.Second, 10*time.Millisecond, "invalid final state")
require.NoError(t, err, "get final state failed")
require.Equal(t, uint64(3), st.Evacuated(), "invalid final count")
require.NotNil(t, st.StartedAt(), "invalid final started at")
require.NotNil(t, st.FinishedAt(), "invalid final finished at")
require.ElementsMatch(t, expectedShardIDs, st.ShardIDs(), "invalid final shard ids")
require.Equal(t, "", st.ErrorMessage(), "invalid final error message")
require.NoError(t, eg.Wait())
}

View file

@ -0,0 +1,31 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// LogicalObjectsCount returns logical objects count.
func (s *Shard) LogicalObjectsCount(ctx context.Context) (uint64, error) {
_, span := tracing.StartSpanFromContext(ctx, "Shard.LogicalObjectsCount",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.GetMode().NoMetabase() {
return 0, ErrDegradedMode
}
cc, err := s.metaBase.ObjectCounters()
if err != nil {
return 0, err
}
return cc.Logic(), nil
}

View file

@ -8,15 +8,18 @@ import (
const serviceName = "control.ControlService"
const (
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
rpcHealthCheck = "HealthCheck"
rpcSetNetmapStatus = "SetNetmapStatus"
rpcDropObjects = "DropObjects"
rpcListShards = "ListShards"
rpcSetShardMode = "SetShardMode"
rpcSynchronizeTree = "SynchronizeTree"
rpcEvacuateShard = "EvacuateShard"
rpcStartEvacuateShard = "StartEvacuateShard"
rpcGetEvacuateShardStatus = "GetEvacuateShardStatus"
rpcStopEvacuateShardStatus = "StopEvacuateShard"
rpcFlushCache = "FlushCache"
rpcDoctor = "Doctor"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -141,6 +144,45 @@ func EvacuateShard(cli *client.Client, req *EvacuateShardRequest, opts ...client
return wResp.message, nil
}
// StartEvacuateShard executes ControlService.StartEvacuateShard RPC.
func StartEvacuateShard(cli *client.Client, req *StartShardEvacuationRequest, opts ...client.CallOption) (*StartShardEvacuationResponse, error) {
wResp := newResponseWrapper[StartShardEvacuationResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartEvacuateShard), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// GetEvacuateShardStatus executes ControlService.GetEvacuateShardStatus RPC.
func GetEvacuateShardStatus(cli *client.Client, req *GetShardEvacuationStatusRequest, opts ...client.CallOption) (*GetShardEvacuationStatusResponse, error) {
wResp := newResponseWrapper[GetShardEvacuationStatusResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcGetEvacuateShardStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// StopEvacuateShard executes ControlService.StopEvacuateShard RPC.
func StopEvacuateShard(cli *client.Client, req *StopShardEvacuationRequest, opts ...client.CallOption) (*StopShardEvacuationResponse, error) {
wResp := newResponseWrapper[StopShardEvacuationResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStopEvacuateShardStatus), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}
// FlushCache executes ControlService.FlushCache RPC.
func FlushCache(cli *client.Client, req *FlushCacheRequest, opts ...client.CallOption) (*FlushCacheResponse, error) {
wResp := newResponseWrapper[FlushCacheResponse]()

View file

@ -0,0 +1,60 @@
package control
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func stateToResponse(state *engine.EvacuationState) (*control.GetShardEvacuationStatusResponse, error) {
shardIDs := make([][]byte, 0, len(state.ShardIDs()))
for _, shID := range state.ShardIDs() {
id, err := base58.Decode(shID)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("invalid shard id format: %s", shID))
}
shardIDs = append(shardIDs, id)
}
var evacStatus control.GetShardEvacuationStatusResponse_Body_Status
switch state.ProcessingStatus() {
case engine.EvacuateProcessStateRunning:
evacStatus = control.GetShardEvacuationStatusResponse_Body_RUNNING
case engine.EvacuateProcessStateCompleted:
evacStatus = control.GetShardEvacuationStatusResponse_Body_COMPLETED
default:
evacStatus = control.GetShardEvacuationStatusResponse_Body_EVACUATE_SHARD_STATUS_UNDEFINED
}
var startedAt *control.GetShardEvacuationStatusResponse_Body_UnixTimestamp
if state.StartedAt() != nil {
startedAt = &control.GetShardEvacuationStatusResponse_Body_UnixTimestamp{
Value: state.StartedAt().Unix(),
}
}
var duration *control.GetShardEvacuationStatusResponse_Body_Duration
if state.StartedAt() != nil {

Is it possible to use one if statement here and above?

Is it possible to use one `if` statement here and above?

It is possible. But first if is for startedAt value, and second if for duration value. It's easier for me this way, i'm old.

It is possible. But first `if` is for `startedAt` value, and second `if` for `duration` value. It's easier for me this way, i'm old.
end := time.Now().UTC()
if state.FinishedAt() != nil {
end = *state.FinishedAt()
}
duration = &control.GetShardEvacuationStatusResponse_Body_Duration{
Seconds: int64(end.Sub(*state.StartedAt()).Seconds()),
}
}
return &control.GetShardEvacuationStatusResponse{
Body: &control.GetShardEvacuationStatusResponse_Body{
Shard_ID: shardIDs,
Evacuated: state.Evacuated(),
Total: state.Total(),
Failed: state.Failed(),
Status: evacStatus,
StartedAt: startedAt,
Duration: duration,
ErrorMessage: state.ErrorMessage(),
},
}, nil
}

View file

@ -37,7 +37,7 @@ func (s *Server) EvacuateShard(ctx context.Context, req *control.EvacuateShardRe
resp := &control.EvacuateShardResponse{
Body: &control.EvacuateShardResponse_Body{
Count: uint32(res.Count()),
Count: uint32(res.Evacuated()),
},
}

View file

@ -0,0 +1,97 @@
package control
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartShardEvacuationRequest) (*control.StartShardEvacuationResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
var prm engine.EvacuateShardPrm
prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID()))
prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors())
prm.WithFaultHandler(s.replicate)
prm.WithAsync(true)

For me it's obvios that WithAsync sets async flag and there's no need to pass the boolean argument - just WithAsync() - WDYT?

For me it's obvios that `WithAsync` sets `async` flag and there's no need to pass the boolean argument - just `WithAsync()` - WDYT?

It is usually more convenient because:

  1. Easier to pass down the stream (WithAsync(async) instead of if async { WithAsync }.
  2. Easier to override (e.g. we have defaults in tests, but don't want Async for one specific test).
It is usually more convenient because: 1. Easier to pass down the stream (`WithAsync(async)` instead of `if async { WithAsync }`. 2. Easier to override (e.g. we have defaults in tests, but don't want Async for one specific test).
_, err = s.s.Evacuate(ctx, prm)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.StartShardEvacuationResponse{
Body: &control.StartShardEvacuationResponse_Body{},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) GetShardEvacuationStatus(ctx context.Context, req *control.GetShardEvacuationStatusRequest) (*control.GetShardEvacuationStatusResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
state, err := s.s.GetEvacuationState(ctx)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp, err := stateToResponse(state)
if err != nil {
return nil, err
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}
func (s *Server) StopShardEvacuation(ctx context.Context, req *control.StopShardEvacuationRequest) (*control.StopShardEvacuationResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
err = s.s.EnqueRunningEvacuationStop(ctx)
if err != nil {
var logicalErr logicerr.Logical
if errors.As(err, &logicalErr) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.StopShardEvacuationResponse{
Body: &control.StopShardEvacuationResponse_Body{},
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return resp, nil
}

File diff suppressed because it is too large Load diff

View file

@ -27,8 +27,18 @@ service ControlService {
rpc SynchronizeTree (SynchronizeTreeRequest) returns (SynchronizeTreeResponse);
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
rpc EvacuateShard (EvacuateShardRequest) returns (EvacuateShardResponse);
// StartShardEvacuation starts moving all data from one shard to the others.
rpc StartShardEvacuation (StartShardEvacuationRequest) returns (StartShardEvacuationResponse);
fyrchik marked this conversation as resolved Outdated

Why is it evacuation in CLI and evacuate here?

Why is it `evacuation` in CLI and `evacuate` here?

fixed

fixed
// GetShardEvacuationStatus returns evacuation status.
rpc GetShardEvacuationStatus (GetShardEvacuationStatusRequest) returns (GetShardEvacuationStatusResponse);
// StopShardEvacuation stops moving all data from one shard to the others.
rpc StopShardEvacuation (StopShardEvacuationRequest) returns (StopShardEvacuationResponse);
// FlushCache moves all data from one shard to the others.
rpc FlushCache (FlushCacheRequest) returns (FlushCacheResponse);
@ -298,3 +308,97 @@ message DoctorResponse {
Body body = 1;
Signature signature = 2;
}
// StartShardEvacuation request.
message StartShardEvacuationRequest {
// Request body structure.
message Body {
// IDs of the shards.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
}
Body body = 1;
Signature signature = 2;
}
// StartShardEvacuation response.
message StartShardEvacuationResponse {
// Response body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// GetShardEvacuationStatus request.
message GetShardEvacuationStatusRequest {
// Request body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// GetShardEvacuationStatus response.
message GetShardEvacuationStatusResponse {
// Response body structure.
message Body {
// Evacuate status enum.
enum Status {
EVACUATE_SHARD_STATUS_UNDEFINED = 0;
RUNNING = 1;
COMPLETED = 2;
}
// Unix timestamp value.
message UnixTimestamp {
int64 value = 1;
}
// Duration in seconds.
message Duration {
int64 seconds = 1;
}
// Total objects to evacuate count. The value is approximate, so evacuated + failed == total is not guaranteed after completion.
uint64 total = 1;
// Evacuated objects count.
uint64 evacuated = 2;
// Failed objects count.
uint64 failed = 3;
// Shard IDs.
repeated bytes shard_ID = 4;
// Evacuation process status.
Status status = 5;
// Evacuation process duration.
Duration duration = 6;
// Evacuation process started at timestamp.
UnixTimestamp started_at = 7;
// Error message if evacuation failed.
string error_message = 8;
}
Body body = 1;
Signature signature = 2;
}
// StopShardEvacuation request.
message StopShardEvacuationRequest {
// Request body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}
// StopShardEvacuation response.
message StopShardEvacuationResponse {
// Response body structure.
message Body {}
Body body = 1;
Signature signature = 2;
}

View file

@ -1391,3 +1391,519 @@ func (x *DoctorResponse) ReadSignedData(buf []byte) ([]byte, error) {
func (x *DoctorResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StartShardEvacuationRequest_Body) StableSize() (size int) {
size += proto.RepeatedBytesSize(1, x.Shard_ID)
size += proto.BoolSize(2, x.IgnoreErrors)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StartShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
offset += proto.BoolMarshal(2, buf[offset:], x.IgnoreErrors)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StartShardEvacuationRequest) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StartShardEvacuationRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *StartShardEvacuationRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *StartShardEvacuationRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *StartShardEvacuationRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StartShardEvacuationResponse_Body) StableSize() (size int) {
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StartShardEvacuationResponse_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StartShardEvacuationResponse) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StartShardEvacuationResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *StartShardEvacuationResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *StartShardEvacuationResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *StartShardEvacuationResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusRequest_Body) StableSize() (size int) {
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusRequest_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusRequest) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetShardEvacuationStatusRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetShardEvacuationStatusRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetShardEvacuationStatusRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusResponse_Body_UnixTimestamp) StableSize() (size int) {
size += proto.Int64Size(1, x.Value)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusResponse_Body_UnixTimestamp) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.Int64Marshal(1, buf[offset:], x.Value)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusResponse_Body_Duration) StableSize() (size int) {
size += proto.Int64Size(1, x.Seconds)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusResponse_Body_Duration) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.Int64Marshal(1, buf[offset:], x.Seconds)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusResponse_Body) StableSize() (size int) {
size += proto.UInt64Size(1, x.Total)
size += proto.UInt64Size(2, x.Evacuated)
size += proto.UInt64Size(3, x.Failed)
size += proto.RepeatedBytesSize(4, x.Shard_ID)
size += proto.EnumSize(5, int32(x.Status))
size += proto.NestedStructureSize(6, x.Duration)
size += proto.NestedStructureSize(7, x.StartedAt)
size += proto.StringSize(8, x.ErrorMessage)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusResponse_Body) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.UInt64Marshal(1, buf[offset:], x.Total)
offset += proto.UInt64Marshal(2, buf[offset:], x.Evacuated)
offset += proto.UInt64Marshal(3, buf[offset:], x.Failed)
offset += proto.RepeatedBytesMarshal(4, buf[offset:], x.Shard_ID)
offset += proto.EnumMarshal(5, buf[offset:], int32(x.Status))
offset += proto.NestedStructureMarshal(6, buf[offset:], x.Duration)
offset += proto.NestedStructureMarshal(7, buf[offset:], x.StartedAt)
offset += proto.StringMarshal(8, buf[offset:], x.ErrorMessage)
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *GetShardEvacuationStatusResponse) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *GetShardEvacuationStatusResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *GetShardEvacuationStatusResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *GetShardEvacuationStatusResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *GetShardEvacuationStatusResponse) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StopShardEvacuationRequest_Body) StableSize() (size int) {
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StopShardEvacuationRequest_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StopShardEvacuationRequest) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StopShardEvacuationRequest) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *StopShardEvacuationRequest) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *StopShardEvacuationRequest) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *StopShardEvacuationRequest) SetSignature(sig *Signature) {
x.Signature = sig
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StopShardEvacuationResponse_Body) StableSize() (size int) {
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StopShardEvacuationResponse_Body) StableMarshal(buf []byte) []byte {
return buf
}
// StableSize returns the size of x in protobuf format.
//
// Structures with the same field values have the same binary size.
func (x *StopShardEvacuationResponse) StableSize() (size int) {
size += proto.NestedStructureSize(1, x.Body)
size += proto.NestedStructureSize(2, x.Signature)
return size
}
// StableMarshal marshals x in protobuf binary format with stable field order.
//
// If buffer length is less than x.StableSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same binary format.
func (x *StopShardEvacuationResponse) StableMarshal(buf []byte) []byte {
if x == nil {
return []byte{}
}
if buf == nil {
buf = make([]byte, x.StableSize())
}
var offset int
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
return buf
}
// ReadSignedData fills buf with signed data of x.
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
//
// Returns any error encountered which did not allow writing the data completely.
// Otherwise, returns the buffer in which the data is written.
//
// Structures with the same field values have the same signed data.
func (x *StopShardEvacuationResponse) SignedDataSize() int {
return x.GetBody().StableSize()
}
// SignedDataSize returns size of the request signed data in bytes.
//
// Structures with the same field values have the same signed data size.
func (x *StopShardEvacuationResponse) ReadSignedData(buf []byte) ([]byte, error) {
return x.GetBody().StableMarshal(buf), nil
}
func (x *StopShardEvacuationResponse) SetSignature(sig *Signature) {
x.Signature = sig
}

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.12.4
// - protoc-gen-go-grpc v1.3.0
// - protoc v3.21.9
// source: pkg/services/control/service.proto
package control
@ -18,6 +18,21 @@ import (
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const (
ControlService_HealthCheck_FullMethodName = "/control.ControlService/HealthCheck"
ControlService_SetNetmapStatus_FullMethodName = "/control.ControlService/SetNetmapStatus"
ControlService_DropObjects_FullMethodName = "/control.ControlService/DropObjects"
ControlService_ListShards_FullMethodName = "/control.ControlService/ListShards"
ControlService_SetShardMode_FullMethodName = "/control.ControlService/SetShardMode"
ControlService_SynchronizeTree_FullMethodName = "/control.ControlService/SynchronizeTree"
ControlService_EvacuateShard_FullMethodName = "/control.ControlService/EvacuateShard"
ControlService_StartShardEvacuation_FullMethodName = "/control.ControlService/StartShardEvacuation"
ControlService_GetShardEvacuationStatus_FullMethodName = "/control.ControlService/GetShardEvacuationStatus"
ControlService_StopShardEvacuation_FullMethodName = "/control.ControlService/StopShardEvacuation"
ControlService_FlushCache_FullMethodName = "/control.ControlService/FlushCache"
ControlService_Doctor_FullMethodName = "/control.ControlService/Doctor"
)
// ControlServiceClient is the client API for ControlService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
@ -35,7 +50,14 @@ type ControlServiceClient interface {
// Synchronizes all log operations for the specified tree.
SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
GetShardEvacuationStatus(ctx context.Context, in *GetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*GetShardEvacuationStatusResponse, error)
// StopShardEvacuation stops moving all data from one shard to the others.
StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error)
// FlushCache moves all data from one shard to the others.
FlushCache(ctx context.Context, in *FlushCacheRequest, opts ...grpc.CallOption) (*FlushCacheResponse, error)
// Doctor performs storage restructuring operations on engine.
@ -52,7 +74,7 @@ func NewControlServiceClient(cc grpc.ClientConnInterface) ControlServiceClient {
func (c *controlServiceClient) HealthCheck(ctx context.Context, in *HealthCheckRequest, opts ...grpc.CallOption) (*HealthCheckResponse, error) {
out := new(HealthCheckResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/HealthCheck", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_HealthCheck_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -61,7 +83,7 @@ func (c *controlServiceClient) HealthCheck(ctx context.Context, in *HealthCheckR
func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetmapStatusRequest, opts ...grpc.CallOption) (*SetNetmapStatusResponse, error) {
out := new(SetNetmapStatusResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/SetNetmapStatus", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_SetNetmapStatus_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -70,7 +92,7 @@ func (c *controlServiceClient) SetNetmapStatus(ctx context.Context, in *SetNetma
func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsRequest, opts ...grpc.CallOption) (*DropObjectsResponse, error) {
out := new(DropObjectsResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/DropObjects", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_DropObjects_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -79,7 +101,7 @@ func (c *controlServiceClient) DropObjects(ctx context.Context, in *DropObjectsR
func (c *controlServiceClient) ListShards(ctx context.Context, in *ListShardsRequest, opts ...grpc.CallOption) (*ListShardsResponse, error) {
out := new(ListShardsResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/ListShards", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_ListShards_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -88,7 +110,7 @@ func (c *controlServiceClient) ListShards(ctx context.Context, in *ListShardsReq
func (c *controlServiceClient) SetShardMode(ctx context.Context, in *SetShardModeRequest, opts ...grpc.CallOption) (*SetShardModeResponse, error) {
out := new(SetShardModeResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/SetShardMode", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_SetShardMode_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -97,7 +119,7 @@ func (c *controlServiceClient) SetShardMode(ctx context.Context, in *SetShardMod
func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *SynchronizeTreeRequest, opts ...grpc.CallOption) (*SynchronizeTreeResponse, error) {
out := new(SynchronizeTreeResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/SynchronizeTree", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_SynchronizeTree_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -106,7 +128,34 @@ func (c *controlServiceClient) SynchronizeTree(ctx context.Context, in *Synchron
func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateShardRequest, opts ...grpc.CallOption) (*EvacuateShardResponse, error) {
out := new(EvacuateShardResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/EvacuateShard", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_EvacuateShard_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) StartShardEvacuation(ctx context.Context, in *StartShardEvacuationRequest, opts ...grpc.CallOption) (*StartShardEvacuationResponse, error) {
out := new(StartShardEvacuationResponse)
err := c.cc.Invoke(ctx, ControlService_StartShardEvacuation_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) GetShardEvacuationStatus(ctx context.Context, in *GetShardEvacuationStatusRequest, opts ...grpc.CallOption) (*GetShardEvacuationStatusResponse, error) {
out := new(GetShardEvacuationStatusResponse)
err := c.cc.Invoke(ctx, ControlService_GetShardEvacuationStatus_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *controlServiceClient) StopShardEvacuation(ctx context.Context, in *StopShardEvacuationRequest, opts ...grpc.CallOption) (*StopShardEvacuationResponse, error) {
out := new(StopShardEvacuationResponse)
err := c.cc.Invoke(ctx, ControlService_StopShardEvacuation_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -115,7 +164,7 @@ func (c *controlServiceClient) EvacuateShard(ctx context.Context, in *EvacuateSh
func (c *controlServiceClient) FlushCache(ctx context.Context, in *FlushCacheRequest, opts ...grpc.CallOption) (*FlushCacheResponse, error) {
out := new(FlushCacheResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/FlushCache", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_FlushCache_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -124,7 +173,7 @@ func (c *controlServiceClient) FlushCache(ctx context.Context, in *FlushCacheReq
func (c *controlServiceClient) Doctor(ctx context.Context, in *DoctorRequest, opts ...grpc.CallOption) (*DoctorResponse, error) {
out := new(DoctorResponse)
err := c.cc.Invoke(ctx, "/control.ControlService/Doctor", in, out, opts...)
err := c.cc.Invoke(ctx, ControlService_Doctor_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
@ -148,7 +197,14 @@ type ControlServiceServer interface {
// Synchronizes all log operations for the specified tree.
SynchronizeTree(context.Context, *SynchronizeTreeRequest) (*SynchronizeTreeResponse, error)
// EvacuateShard moves all data from one shard to the others.
// Deprecated: Use StartShardEvacuation/GetShardEvacuationStatus/StopShardEvacuation
EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error)
// StartShardEvacuation starts moving all data from one shard to the others.
StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error)
// GetShardEvacuationStatus returns evacuation status.
GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error)
// StopShardEvacuation stops moving all data from one shard to the others.
StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error)
// FlushCache moves all data from one shard to the others.
FlushCache(context.Context, *FlushCacheRequest) (*FlushCacheResponse, error)
// Doctor performs storage restructuring operations on engine.
@ -180,6 +236,15 @@ func (UnimplementedControlServiceServer) SynchronizeTree(context.Context, *Synch
func (UnimplementedControlServiceServer) EvacuateShard(context.Context, *EvacuateShardRequest) (*EvacuateShardResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method EvacuateShard not implemented")
}
func (UnimplementedControlServiceServer) StartShardEvacuation(context.Context, *StartShardEvacuationRequest) (*StartShardEvacuationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StartShardEvacuation not implemented")
}
func (UnimplementedControlServiceServer) GetShardEvacuationStatus(context.Context, *GetShardEvacuationStatusRequest) (*GetShardEvacuationStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetShardEvacuationStatus not implemented")
}
func (UnimplementedControlServiceServer) StopShardEvacuation(context.Context, *StopShardEvacuationRequest) (*StopShardEvacuationResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method StopShardEvacuation not implemented")
}
func (UnimplementedControlServiceServer) FlushCache(context.Context, *FlushCacheRequest) (*FlushCacheResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method FlushCache not implemented")
}
@ -208,7 +273,7 @@ func _ControlService_HealthCheck_Handler(srv interface{}, ctx context.Context, d
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/HealthCheck",
FullMethod: ControlService_HealthCheck_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).HealthCheck(ctx, req.(*HealthCheckRequest))
@ -226,7 +291,7 @@ func _ControlService_SetNetmapStatus_Handler(srv interface{}, ctx context.Contex
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/SetNetmapStatus",
FullMethod: ControlService_SetNetmapStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).SetNetmapStatus(ctx, req.(*SetNetmapStatusRequest))
@ -244,7 +309,7 @@ func _ControlService_DropObjects_Handler(srv interface{}, ctx context.Context, d
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/DropObjects",
FullMethod: ControlService_DropObjects_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).DropObjects(ctx, req.(*DropObjectsRequest))
@ -262,7 +327,7 @@ func _ControlService_ListShards_Handler(srv interface{}, ctx context.Context, de
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/ListShards",
FullMethod: ControlService_ListShards_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).ListShards(ctx, req.(*ListShardsRequest))
@ -280,7 +345,7 @@ func _ControlService_SetShardMode_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/SetShardMode",
FullMethod: ControlService_SetShardMode_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).SetShardMode(ctx, req.(*SetShardModeRequest))
@ -298,7 +363,7 @@ func _ControlService_SynchronizeTree_Handler(srv interface{}, ctx context.Contex
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/SynchronizeTree",
FullMethod: ControlService_SynchronizeTree_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).SynchronizeTree(ctx, req.(*SynchronizeTreeRequest))
@ -316,7 +381,7 @@ func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/EvacuateShard",
FullMethod: ControlService_EvacuateShard_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).EvacuateShard(ctx, req.(*EvacuateShardRequest))
@ -324,6 +389,60 @@ func _ControlService_EvacuateShard_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}
func _ControlService_StartShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StartShardEvacuationRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).StartShardEvacuation(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_StartShardEvacuation_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).StartShardEvacuation(ctx, req.(*StartShardEvacuationRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_GetShardEvacuationStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetShardEvacuationStatusRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).GetShardEvacuationStatus(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_GetShardEvacuationStatus_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).GetShardEvacuationStatus(ctx, req.(*GetShardEvacuationStatusRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_StopShardEvacuation_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StopShardEvacuationRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ControlServiceServer).StopShardEvacuation(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ControlService_StopShardEvacuation_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).StopShardEvacuation(ctx, req.(*StopShardEvacuationRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ControlService_FlushCache_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FlushCacheRequest)
if err := dec(in); err != nil {
@ -334,7 +453,7 @@ func _ControlService_FlushCache_Handler(srv interface{}, ctx context.Context, de
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/FlushCache",
FullMethod: ControlService_FlushCache_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).FlushCache(ctx, req.(*FlushCacheRequest))
@ -352,7 +471,7 @@ func _ControlService_Doctor_Handler(srv interface{}, ctx context.Context, dec fu
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/control.ControlService/Doctor",
FullMethod: ControlService_Doctor_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ControlServiceServer).Doctor(ctx, req.(*DoctorRequest))
@ -395,6 +514,18 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
MethodName: "EvacuateShard",
Handler: _ControlService_EvacuateShard_Handler,
},
{
MethodName: "StartShardEvacuation",
Handler: _ControlService_StartShardEvacuation_Handler,
},
{
MethodName: "GetShardEvacuationStatus",
Handler: _ControlService_GetShardEvacuationStatus_Handler,
},
{
MethodName: "StopShardEvacuation",
Handler: _ControlService_StopShardEvacuation_Handler,
},
{
MethodName: "FlushCache",
Handler: _ControlService_FlushCache_Handler,

View file

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.12.4
// protoc v3.21.9
// source: pkg/services/control/types.proto
package control