Detach shard with frostfs-cli control shards detach
command #945
49
cmd/frostfs-cli/modules/control/detach_shards.go
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||||
|
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var shardsDetachCmd = &cobra.Command{
|
||||||
|
Use: "detach",
|
||||||
|
Short: "Detach and close the shards",
|
||||||
|
Long: "Detach and close the shards",
|
||||||
|
Run: shardsDetach,
|
||||||
|
}
|
||||||
|
|
||||||
|
func shardsDetach(cmd *cobra.Command, _ []string) {
|
||||||
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
|
req := &control.DetachShardsRequest{
|
||||||
|
Body: &control.DetachShardsRequest_Body{
|
||||||
|
Shard_ID: getShardIDListFromIDFlag(cmd, false),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
signRequest(cmd, pk, req)
|
||||||
|
|
||||||
|
cli := getClient(cmd, pk)
|
||||||
|
|
||||||
|
var resp *control.DetachShardsResponse
|
||||||
|
var err error
|
||||||
|
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||||
|
resp, err = control.DetachShards(client, req)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||||
|
|
||||||
|
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||||
|
|
||||||
|
cmd.Println("Shard mode update request successfully sent.")
|
||||||
|
}
|
||||||
|
|
||||||
|
func initControlShardsDetachCmd() {
|
||||||
|
initControlFlags(shardsDetachCmd)
|
||||||
|
|
||||||
|
flags := shardsDetachCmd.Flags()
|
||||||
|
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
|
}
|
|
@ -18,6 +18,7 @@ func initControlShardsCmd() {
|
||||||
shardsCmd.AddCommand(flushCacheCmd)
|
shardsCmd.AddCommand(flushCacheCmd)
|
||||||
shardsCmd.AddCommand(doctorCmd)
|
shardsCmd.AddCommand(doctorCmd)
|
||||||
shardsCmd.AddCommand(writecacheShardCmd)
|
shardsCmd.AddCommand(writecacheShardCmd)
|
||||||
|
shardsCmd.AddCommand(shardsDetachCmd)
|
||||||
|
|
||||||
initControlShardsListCmd()
|
initControlShardsListCmd()
|
||||||
initControlSetShardModeCmd()
|
initControlSetShardModeCmd()
|
||||||
|
@ -26,4 +27,5 @@ func initControlShardsCmd() {
|
||||||
initControlFlushCacheCmd()
|
initControlFlushCacheCmd()
|
||||||
initControlDoctorCmd()
|
initControlDoctorCmd()
|
||||||
initControlShardsWritecacheCmd()
|
initControlShardsWritecacheCmd()
|
||||||
|
initControlShardsDetachCmd()
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,9 +145,17 @@ func getShardIDList(cmd *cobra.Command) [][]byte {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return getShardIDListFromIDFlag(cmd, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getShardIDListFromIDFlag(cmd *cobra.Command, withAllFlag bool) [][]byte {
|
||||||
sidList, _ := cmd.Flags().GetStringSlice(shardIDFlag)
|
sidList, _ := cmd.Flags().GetStringSlice(shardIDFlag)
|
||||||
if len(sidList) == 0 {
|
if len(sidList) == 0 {
|
||||||
|
if withAllFlag {
|
||||||
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("either --%s or --%s flag must be provided", shardIDFlag, shardAllFlag))
|
||||||
|
} else {
|
||||||
|
commonCmd.ExitOnErr(cmd, "", fmt.Errorf("--%s flag value must be provided", shardIDFlag))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We can sort the ID list and perform this check without additional allocations,
|
// We can sort the ID list and perform this check without additional allocations,
|
||||||
|
|
|
@ -47,6 +47,7 @@
|
||||||
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
"FROSTFS_NODE_ADDRESSES":"127.0.0.1:8080",
|
||||||
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
"FROSTFS_GRPC_0_ENDPOINT":"127.0.0.1:8080",
|
||||||
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8081",
|
"FROSTFS_CONTROL_GRPC_ENDPOINT":"127.0.0.1:8081",
|
||||||
|
"FROSTFS_CONTROL_AUTHORIZED_KEYS":"031a6c6fbbdf02ca351745fa86b9ba5a9452d785ac4f7fc2b7548ca2a46c4fcf4a",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
"FROSTFS_NODE_ATTRIBUTE_0":"User-Agent:FrostFS/dev",
|
||||||
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
"FROSTFS_NODE_ATTRIBUTE_1":"UN-LOCODE:RU MOW",
|
||||||
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",
|
"FROSTFS_NODE_PERSISTENT_STATE_PATH":"${workspaceFolder}/.cache/.frostfs-node-state",
|
||||||
|
|
|
@ -46,3 +46,9 @@ Shard can automatically switch to a `degraded-read-only` mode in 3 cases:
|
||||||
1. If the metabase was not available or couldn't be opened/initialized during shard startup.
|
1. If the metabase was not available or couldn't be opened/initialized during shard startup.
|
||||||
2. If shard error counter exceeds threshold.
|
2. If shard error counter exceeds threshold.
|
||||||
3. If the metabase couldn't be reopened during SIGHUP handling.
|
3. If the metabase couldn't be reopened during SIGHUP handling.
|
||||||
|
|
||||||
|
# Detach shard
|
||||||
|
|
||||||
|
To detach a shard use `frostfs-cli control shards detach` command. This command removes the shards from the storage
|
||||||
|
engine and closes all resources associated with the shards.
|
||||||
|
Limitation: `SIGHUP` or storage node restart lead to detached shard will be again online.
|
|
@ -2,7 +2,9 @@ package engine
|
||||||||
|
|
||||||||
import (
|
import (
|
||||||||
"context"
|
"context"
|
||||||||
|
"errors"
|
||||||||
"fmt"
|
"fmt"
|
||||||||
|
"sync"
|
||||||||
"sync/atomic"
|
"sync/atomic"
|
||||||||
|
|
||||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||||
|
@ -14,6 +16,7 @@ import (
|
||||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||||
"github.com/panjf2000/ants/v2"
|
"github.com/panjf2000/ants/v2"
|
||||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||||
|
"golang.org/x/sync/errgroup"
|
||||||||
)
|
)
|
||||||||
|
|
||||||||
var errShardNotFound = logicerr.New("shard not found")
|
var errShardNotFound = logicerr.New("shard not found")
|
||||||||
|
@ -344,6 +347,100 @@ func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||||
}
|
}
|
||||||||
}
|
}
|
||||||||
|
|||||||||
|
|
||||||||
|
func (e *StorageEngine) DetachShards(ids []*shard.ID) error {
|
||||||||
|
if len(ids) == 0 {
|
||||||||
|
return logicerr.New("ids must be non-empty")
|
||||||||
|
}
|
||||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If we first delete, then resources could leak, because after the error in If we first delete, then resources could leak, because after the error in `closeShards()` we will have untraceable, but existing shards
dstepanov-yadro
commented
There are two points here:
Also engine acquires lock to delete shards, but closing shard can take a lot of time. Maybe in such case (shard detached from engine, but failed to close) node must panic? There are two points here:
1. `SIGHUP` handler removes shards the same way: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/483a67b17064990d69e39e23d7b3fd6c72d9bfb3/pkg/local_object_storage/engine/shards.go#L192
2. It's unclear what to do with failed to close shards. These shards may be in some undefined state. If engine holds references of failed to close shards, then engine must be able to do something with these shards.
Also engine acquires lock to delete shards, but closing shard can take a lot of time.
Maybe in such case (shard detached from engine, but failed to close) node must panic?
fyrchik
commented
ok, let's leave it like this
dangerous, I think we better postpone this until we support >SIGHUP handler removes shards the same way
ok, let's leave it like this
>Maybe in such case (shard detached from engine, but failed to close) node must panic?
dangerous, I think we better postpone this until we support `disabled` mode, so that the shard is retained and alerts can be thworn
|
|||||||||
|
|
||||||||
|
deletedShards, err := e.deleteShards(ids)
|
||||||||
|
if err != nil {
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
return e.closeShards(deletedShards)
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
// closeShards closes deleted shards. Tries to close all shards.
|
||||||||
|
// Returns single error with joined shard errors.
|
||||||||
|
func (e *StorageEngine) closeShards(deletedShards []hashedShard) error {
|
||||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Do we have any reason not to do it in parallel (besides simplicity)? Do we have any reason not to do it in parallel (besides simplicity)?
dstepanov-yadro
commented
Right, fixed. Right, fixed.
|
|||||||||
|
var multiErr error
|
||||||||
|
var multiErrGuard sync.Mutex
|
||||||||
|
var eg errgroup.Group
|
||||||||
|
for _, sh := range deletedShards {
|
||||||||
|
sh := sh
|
||||||||
|
eg.Go(func() error {
|
||||||||
|
err := sh.SetMode(mode.Disabled)
|
||||||||
|
if err != nil {
|
||||||||
|
e.log.Error(logs.EngineCouldNotChangeShardModeToDisabled,
|
||||||||
|
zap.Stringer("id", sh.ID()),
|
||||||||
|
zap.Error(err),
|
||||||||
|
)
|
||||||||
|
multiErrGuard.Lock()
|
||||||||
|
multiErr = errors.Join(multiErr, fmt.Errorf("could not change shard (id:%s) mode to disabled: %w", sh.ID(), err))
|
||||||||
fyrchik
commented
`%s` doesn't require `.String()` in the argument list
dstepanov-yadro
commented
fixed fixed
|
|||||||||
|
multiErrGuard.Unlock()
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
err = sh.Close()
|
||||||||
|
if err != nil {
|
||||||||
|
e.log.Error(logs.EngineCouldNotCloseRemovedShard,
|
||||||||
|
zap.Stringer("id", sh.ID()),
|
||||||||
|
zap.Error(err),
|
||||||||
|
)
|
||||||||
|
multiErrGuard.Lock()
|
||||||||
|
multiErr = errors.Join(multiErr, fmt.Errorf("could not close removed shard (id:%s): %w", sh.ID(), err))
|
||||||||
|
multiErrGuard.Unlock()
|
||||||||
|
}
|
||||||||
|
return nil
|
||||||||
|
})
|
||||||||
|
}
|
||||||||
|
if err := eg.Wait(); err != nil {
|
||||||||
|
return err
|
||||||||
|
}
|
||||||||
|
return multiErr
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
// deleteShards deletes shards with specified ids from engine shard list
|
||||||||
|
// and releases all engine resources associated with shards.
|
||||||||
|
// Returns deleted shards or error if some shard could not be deleted.
|
||||||||
|
func (e *StorageEngine) deleteShards(ids []*shard.ID) ([]hashedShard, error) {
|
||||||||
|
ss := make([]hashedShard, 0, len(ids))
|
||||||||
|
|
||||||||
|
e.mtx.Lock()
|
||||||||
|
defer e.mtx.Unlock()
|
||||||||
|
|
||||||||
|
for _, id := range ids {
|
||||||||
|
idStr := id.String()
|
||||||||
|
sh, found := e.shards[idStr]
|
||||||||
|
if !found {
|
||||||||
|
return nil, errShardNotFound
|
||||||||
|
}
|
||||||||
|
ss = append(ss, sh)
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
if len(ss) == len(e.shards) {
|
||||||||
|
return nil, logicerr.New("could not delete all the shards")
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
for _, sh := range ss {
|
||||||||
|
idStr := sh.ID().String()
|
||||||||
|
|
||||||||
|
sh.DeleteShardMetrics()
|
||||||||
|
|
||||||||
|
delete(e.shards, idStr)
|
||||||||
|
|
||||||||
|
pool, ok := e.shardPools[idStr]
|
||||||||
|
if ok {
|
||||||||
|
pool.Release()
|
||||||||
|
delete(e.shardPools, idStr)
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
e.log.Info(logs.EngineShardHasBeenRemoved,
|
||||||||
|
zap.String("id", idStr))
|
||||||||
|
}
|
||||||||
|
|
||||||||
|
return ss, nil
|
||||||||
|
}
|
||||||||
|
|
||||||||
func (s hashedShard) Hash() uint64 {
|
func (s hashedShard) Hash() uint64 {
|
||||||||
return s.hash
|
return s.hash
|
||||||||
}
|
}
|
||||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -42,3 +44,21 @@ func TestRemoveShard(t *testing.T) {
|
||||||
require.True(t, ok != removed)
|
require.True(t, ok != removed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDisableShards(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const numOfShards = 2
|
||||||
|
|
||||||
|
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||||
|
e, ids := te.engine, te.shardIDs
|
||||||
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
require.ErrorAs(t, e.DetachShards(ids), new(logicerr.Logical))
|
||||||
|
require.ErrorAs(t, e.DetachShards(nil), new(logicerr.Logical))
|
||||||
|
require.ErrorAs(t, e.DetachShards([]*shard.ID{}), new(logicerr.Logical))
|
||||||
|
|
||||||
|
require.NoError(t, e.DetachShards([]*shard.ID{ids[0]}))
|
||||||
|
|
||||||
|
require.Equal(t, 1, len(e.shards))
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can we also add subsequent "readdition" of the removed shard here? (corresponds to sighup) Can we also add subsequent "readdition" of the removed shard here? (corresponds to sighup)
dstepanov-yadro
commented
It is too hard too reproduce It is too hard too reproduce `SIGHUP`
|
|||||||
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ const (
|
||||||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||||
rpcSealWriteCache = "SealWriteCache"
|
rpcSealWriteCache = "SealWriteCache"
|
||||||
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
|
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
|
||||||
|
rpcDetachShards = "DetachShards"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||||
|
@ -292,3 +293,22 @@ func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...clie
|
||||||
|
|
||||||
return wResp.message, nil
|
return wResp.message, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DetachShards executes ControlService.DetachShards RPC.
|
||||||
|
func DetachShards(
|
||||||
|
cli *client.Client,
|
||||||
|
req *DetachShardsRequest,
|
||||||
|
opts ...client.CallOption,
|
||||||
|
) (*DetachShardsResponse, error) {
|
||||||
|
wResp := newResponseWrapper[DetachShardsResponse]()
|
||||||
|
|
||||||
|
wReq := &requestWrapper{
|
||||||
|
m: req,
|
||||||
|
}
|
||||||
|
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcDetachShards), wReq, wResp, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return wResp.message, nil
|
||||||
|
}
|
||||||
|
|
37
pkg/services/control/server/detach_shards.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package control
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) DetachShards(_ context.Context, req *control.DetachShardsRequest) (*control.DetachShardsResponse, error) {
|
||||||
|
err := s.isValidRequest(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIDs := s.getShardIDList(req.GetBody().GetShard_ID())
|
||||||
|
|
||||||
|
if err := s.s.DetachShards(shardIDs); err != nil {
|
||||||
|
if errors.As(err, new(logicerr.Logical)) {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := &control.DetachShardsResponse{
|
||||||
|
Body: &control.DetachShardsResponse_Body{},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = SignMessage(s.key, resp); err != nil {
|
||||||
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
}
|
1067
pkg/services/control/service.pb.go
generated
|
@ -62,6 +62,9 @@ service ControlService {
|
||||||
|
|
||||||
// Flush objects from write-cache and move it to degraded read only mode.
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||||
|
|
||||||
|
// DetachShards detaches and closes shards.
|
||||||
|
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Health check request.
|
// Health check request.
|
||||||
|
@ -584,3 +587,21 @@ message SealWriteCacheResponse {
|
||||||
|
|
||||||
Signature signature = 2;
|
Signature signature = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message DetachShardsRequest {
|
||||||
|
message Body {
|
||||||
|
repeated bytes shard_ID = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message DetachShardsResponse {
|
||||||
|
message Body {
|
||||||
|
}
|
||||||
|
|
||||||
|
Body body = 1;
|
||||||
|
|
||||||
|
Signature signature = 2;
|
||||||
|
}
|
||||||
|
|
163
pkg/services/control/service_frostfs.pb.go
generated
|
@ -3144,3 +3144,166 @@ func (x *SealWriteCacheResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
|
func (x *SealWriteCacheResponse) SetSignature(sig *Signature) {
|
||||||
x.Signature = sig
|
x.Signature = sig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *DetachShardsRequest_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.RepeatedBytesSize(1, x.Shard_ID)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *DetachShardsRequest_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.RepeatedBytesMarshal(1, buf[offset:], x.Shard_ID)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *DetachShardsRequest) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *DetachShardsRequest) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *DetachShardsRequest) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *DetachShardsRequest) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DetachShardsRequest) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *DetachShardsResponse_Body) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *DetachShardsResponse_Body) StableMarshal(buf []byte) []byte {
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableSize returns the size of x in protobuf format.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary size.
|
||||||
|
func (x *DetachShardsResponse) StableSize() (size int) {
|
||||||
|
if x == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
size += proto.NestedStructureSize(1, x.Body)
|
||||||
|
size += proto.NestedStructureSize(2, x.Signature)
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
|
||||||
|
// StableMarshal marshals x in protobuf binary format with stable field order.
|
||||||
|
//
|
||||||
|
// If buffer length is less than x.StableSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same binary format.
|
||||||
|
func (x *DetachShardsResponse) StableMarshal(buf []byte) []byte {
|
||||||
|
if x == nil {
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
if buf == nil {
|
||||||
|
buf = make([]byte, x.StableSize())
|
||||||
|
}
|
||||||
|
var offset int
|
||||||
|
offset += proto.NestedStructureMarshal(1, buf[offset:], x.Body)
|
||||||
|
offset += proto.NestedStructureMarshal(2, buf[offset:], x.Signature)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadSignedData fills buf with signed data of x.
|
||||||
|
// If buffer length is less than x.SignedDataSize(), new buffer is allocated.
|
||||||
|
//
|
||||||
|
// Returns any error encountered which did not allow writing the data completely.
|
||||||
|
// Otherwise, returns the buffer in which the data is written.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data.
|
||||||
|
func (x *DetachShardsResponse) SignedDataSize() int {
|
||||||
|
return x.GetBody().StableSize()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SignedDataSize returns size of the request signed data in bytes.
|
||||||
|
//
|
||||||
|
// Structures with the same field values have the same signed data size.
|
||||||
|
func (x *DetachShardsResponse) ReadSignedData(buf []byte) ([]byte, error) {
|
||||||
|
return x.GetBody().StableMarshal(buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DetachShardsResponse) SetSignature(sig *Signature) {
|
||||||
|
x.Signature = sig
|
||||||
|
}
|
||||||
|
|
39
pkg/services/control/service_grpc.pb.go
generated
|
@ -37,6 +37,7 @@ const (
|
||||||
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
ControlService_RemoveChainLocalOverride_FullMethodName = "/control.ControlService/RemoveChainLocalOverride"
|
||||||
ControlService_ListTargetsLocalOverrides_FullMethodName = "/control.ControlService/ListTargetsLocalOverrides"
|
ControlService_ListTargetsLocalOverrides_FullMethodName = "/control.ControlService/ListTargetsLocalOverrides"
|
||||||
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache"
|
||||||
|
ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ControlServiceClient is the client API for ControlService service.
|
// ControlServiceClient is the client API for ControlService service.
|
||||||
|
@ -80,6 +81,8 @@ type ControlServiceClient interface {
|
||||||
ListTargetsLocalOverrides(ctx context.Context, in *ListTargetsLocalOverridesRequest, opts ...grpc.CallOption) (*ListTargetsLocalOverridesResponse, error)
|
ListTargetsLocalOverrides(ctx context.Context, in *ListTargetsLocalOverridesRequest, opts ...grpc.CallOption) (*ListTargetsLocalOverridesResponse, error)
|
||||||
// Flush objects from write-cache and move it to degraded read only mode.
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
|
SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error)
|
||||||
|
// DetachShards detaches and closes shards.
|
||||||
|
DetachShards(ctx context.Context, in *DetachShardsRequest, opts ...grpc.CallOption) (*DetachShardsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type controlServiceClient struct {
|
type controlServiceClient struct {
|
||||||
|
@ -252,6 +255,15 @@ func (c *controlServiceClient) SealWriteCache(ctx context.Context, in *SealWrite
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *controlServiceClient) DetachShards(ctx context.Context, in *DetachShardsRequest, opts ...grpc.CallOption) (*DetachShardsResponse, error) {
|
||||||
|
out := new(DetachShardsResponse)
|
||||||
|
err := c.cc.Invoke(ctx, ControlService_DetachShards_FullMethodName, in, out, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ControlServiceServer is the server API for ControlService service.
|
// ControlServiceServer is the server API for ControlService service.
|
||||||
// All implementations should embed UnimplementedControlServiceServer
|
// All implementations should embed UnimplementedControlServiceServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
|
@ -293,6 +305,8 @@ type ControlServiceServer interface {
|
||||||
ListTargetsLocalOverrides(context.Context, *ListTargetsLocalOverridesRequest) (*ListTargetsLocalOverridesResponse, error)
|
ListTargetsLocalOverrides(context.Context, *ListTargetsLocalOverridesRequest) (*ListTargetsLocalOverridesResponse, error)
|
||||||
// Flush objects from write-cache and move it to degraded read only mode.
|
// Flush objects from write-cache and move it to degraded read only mode.
|
||||||
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
|
SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error)
|
||||||
|
// DetachShards detaches and closes shards.
|
||||||
|
DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
// UnimplementedControlServiceServer should be embedded to have forward compatible implementations.
|
||||||
|
@ -353,6 +367,9 @@ func (UnimplementedControlServiceServer) ListTargetsLocalOverrides(context.Conte
|
||||||
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
|
func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method SealWriteCache not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedControlServiceServer) DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method DetachShards not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
// Use of this interface is not recommended, as added methods to ControlServiceServer will
|
||||||
|
@ -689,6 +706,24 @@ func _ControlService_SealWriteCache_Handler(srv interface{}, ctx context.Context
|
||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _ControlService_DetachShards_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(DetachShardsRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(ControlServiceServer).DetachShards(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: ControlService_DetachShards_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(ControlServiceServer).DetachShards(ctx, req.(*DetachShardsRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
// ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
|
@ -768,6 +803,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{
|
||||||
MethodName: "SealWriteCache",
|
MethodName: "SealWriteCache",
|
||||||
Handler: _ControlService_SealWriteCache_Handler,
|
Handler: _ControlService_SealWriteCache_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "DetachShards",
|
||||||
|
Handler: _ControlService_DetachShards_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{},
|
Streams: []grpc.StreamDesc{},
|
||||||
Metadata: "pkg/services/control/service.proto",
|
Metadata: "pkg/services/control/service.proto",
|
||||||
|
|
Why is it a separate method and not a
SetMode
extension?Because this 'DISABLED' mode is not an actual shard mode like 'READ-ONLY' or other. It has other meaning: detach shard (release all resources and close shard).
Now
engine.SetShardMode
with mode = mode.Disabled doesn't detach shard, but just moves shard toDISABLED
mode, so shard holds resources, but doesn't allow to read/write any objects.