From 6b6eabe41cd5750257adc3041f6a1c28df8197c7 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 29 Aug 2024 12:11:19 +0300 Subject: [PATCH] [#1337] cli: Add `control shards rebuild` command Signed-off-by: Dmitrii Stepanov --- .../modules/control/rebuild_shards.go | 88 ++ cmd/frostfs-cli/modules/control/shards.go | 1 + pkg/local_object_storage/engine/rebuild.go | 90 ++ pkg/local_object_storage/shard/rebuild.go | 33 + pkg/services/control/rpc.go | 14 + pkg/services/control/server/rebuild.go | 59 ++ pkg/services/control/service.proto | 29 + pkg/services/control/service_frostfs.pb.go | 918 ++++++++++++++++++ pkg/services/control/service_grpc.pb.go | 39 + 9 files changed, 1271 insertions(+) create mode 100644 cmd/frostfs-cli/modules/control/rebuild_shards.go create mode 100644 pkg/local_object_storage/engine/rebuild.go create mode 100644 pkg/services/control/server/rebuild.go diff --git a/cmd/frostfs-cli/modules/control/rebuild_shards.go b/cmd/frostfs-cli/modules/control/rebuild_shards.go new file mode 100644 index 000000000..e2b408712 --- /dev/null +++ b/cmd/frostfs-cli/modules/control/rebuild_shards.go @@ -0,0 +1,88 @@ +package control + +import ( + "fmt" + + rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/mr-tron/base58" + "github.com/spf13/cobra" +) + +const ( + fillPercentFlag = "fill_percent" +) + +var shardsRebuildCmd = &cobra.Command{ + Use: "rebuild", + Short: "Rebuild shards", + Long: "Rebuild reclaims storage occupied by dead objects and adjusts the storage structure according to the configuration (for blobovnicza only now)", + Run: shardsRebuild, +} + +func shardsRebuild(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + req := &control.StartShardRebuildRequest{ + Body: &control.StartShardRebuildRequest_Body{ + Shard_ID: getShardIDList(cmd), + TargetFillPercent: getFillPercentValue(cmd), + ConcurrencyLimit: getConcurrencyValue(cmd), + }, + } + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.StartShardRebuildResponse + var err error + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.StartShardRebuild(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + var success, failed uint + for _, res := range resp.GetBody().GetResults() { + if res.GetSuccess() { + success++ + cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID())) + } else { + failed++ + cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError()) + } + } + cmd.Printf("Total: %d success, %d failed\n", success, failed) +} + +func getFillPercentValue(cmd *cobra.Command) uint32 { + v, _ := cmd.Flags().GetUint32(fillPercentFlag) + if v <= 0 || v > 100 { + commonCmd.ExitOnErr(cmd, "invalid fill_percent value", fmt.Errorf("fill_percent value must be (0, 100], current value: %d", v)) + } + return v +} + +func getConcurrencyValue(cmd *cobra.Command) uint32 { + v, _ := cmd.Flags().GetUint32(concurrencyFlag) + if v <= 0 || v > 10000 { + commonCmd.ExitOnErr(cmd, "invalid concurrency value", fmt.Errorf("concurrency value must be (0, 10 000], current value: %d", v)) + } + return v +} + +func initControlShardRebuildCmd() { + initControlFlags(shardsRebuildCmd) + + flags := shardsRebuildCmd.Flags() + flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") + flags.Bool(shardAllFlag, false, "Process all shards") + flags.Uint32(fillPercentFlag, 80, "Target fill percent to reclaim space") + flags.Uint32(concurrencyFlag, 20, "Maximum count of concurrently rebuilding files") + setShardModeCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) +} diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index d8198c426..d6c2a0b9b 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -28,4 +28,5 @@ func initControlShardsCmd() { initControlDoctorCmd() initControlShardsWritecacheCmd() initControlShardsDetachCmd() + initControlShardRebuildCmd() } diff --git a/pkg/local_object_storage/engine/rebuild.go b/pkg/local_object_storage/engine/rebuild.go new file mode 100644 index 000000000..3970aae89 --- /dev/null +++ b/pkg/local_object_storage/engine/rebuild.go @@ -0,0 +1,90 @@ +package engine + +import ( + "context" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +type RebuildPrm struct { + ShardIDs []*shard.ID + ConcurrencyLimit uint32 + TargetFillPercent uint32 +} + +type ShardRebuildResult struct { + ShardID *shard.ID + Success bool + ErrorMsg string +} + +type RebuildRes struct { + ShardResults []ShardRebuildResult +} + +func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Rebuild", + trace.WithAttributes( + attribute.Int("shard_id_count", len(prm.ShardIDs)), + attribute.Int64("target_fill_percent", int64(prm.TargetFillPercent)), + attribute.Int64("concurrency_limit", int64(prm.ConcurrencyLimit)), + )) + defer span.End() + + res := RebuildRes{ + ShardResults: make([]ShardRebuildResult, 0, len(prm.ShardIDs)), + } + resGuard := &sync.Mutex{} + + limiter := newRebuildLimiter(prm.ConcurrencyLimit) + + eg, egCtx := errgroup.WithContext(ctx) + for _, shardID := range prm.ShardIDs { + eg.Go(func() error { + e.mtx.RLock() + sh, ok := e.shards[shardID.String()] + e.mtx.RUnlock() + + if !ok { + resGuard.Lock() + defer resGuard.Unlock() + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + ErrorMsg: errShardNotFound.Error(), + }) + return nil + } + + err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{ + ConcurrencyLimiter: limiter, + TargetFillPercent: prm.TargetFillPercent, + }) + + resGuard.Lock() + defer resGuard.Unlock() + + if err != nil { + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + ErrorMsg: err.Error(), + }) + } else { + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + Success: true, + }) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return RebuildRes{}, err + } + return res, nil +} diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 998fcf08b..f8051999e 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -10,7 +10,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -171,3 +174,33 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres _, err := u.mb.UpdateStorageID(ctx, prm) return err } + +type RebuildPrm struct { + ConcurrencyLimiter RebuildWorkerLimiter + TargetFillPercent uint32 +} + +func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ScheduleRebuild", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Int64("target_fill_percent", int64(p.TargetFillPercent)), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{ + SchemaChange: true, + FillPercent: true, + FillPercentValue: int(p.TargetFillPercent), + }) +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index a90e58a65..80aece008 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -30,6 +30,7 @@ const ( rpcSealWriteCache = "SealWriteCache" rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides" rpcDetachShards = "DetachShards" + rpcStartShardRebuild = "StartShardRebuild" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -361,3 +362,16 @@ func DetachShards( return wResp.message, nil } + +// StartShardRebuild executes ControlService.StartShardRebuild RPC. +func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts ...client.CallOption) (*StartShardRebuildResponse, error) { + wResp := newResponseWrapper[StartShardRebuildResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartShardRebuild), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} diff --git a/pkg/services/control/server/rebuild.go b/pkg/services/control/server/rebuild.go new file mode 100644 index 000000000..6ddfb8bf4 --- /dev/null +++ b/pkg/services/control/server/rebuild.go @@ -0,0 +1,59 @@ +package control + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) StartShardRebuild(ctx context.Context, req *control.StartShardRebuildRequest) (*control.StartShardRebuildResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + if req.GetBody().GetConcurrencyLimit() == 0 || req.GetBody().GetConcurrencyLimit() > 10000 { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("concurrency limit must be in range (0; 10 000], current value %d", req.GetBody().GetConcurrencyLimit())) + } + + if req.GetBody().GetTargetFillPercent() == 0 || req.GetBody().GetTargetFillPercent() > 100 { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("fill percent must be in range (0; 100], current value %d", req.GetBody().GetTargetFillPercent())) + } + + prm := engine.RebuildPrm{ + ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), + ConcurrencyLimit: req.GetBody().GetConcurrencyLimit(), + TargetFillPercent: req.GetBody().GetTargetFillPercent(), + } + + res, err := s.s.Rebuild(ctx, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.StartShardRebuildResponse{Body: &control.StartShardRebuildResponse_Body{}} + for _, r := range res.ShardResults { + if r.Success { + resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{ + Shard_ID: *r.ShardID, + Success: true, + }) + } else { + resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{ + Shard_ID: *r.ShardID, + Error: r.ErrorMsg, + }) + } + } + + err = ctrlmessage.Sign(s.key, resp) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index d6639cb48..04994328a 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -91,6 +91,9 @@ service ControlService { // DetachShards detaches and closes shards. rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse); + + // StartShardRebuild starts shard rebuild process. + rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse); } // Health check request. @@ -699,3 +702,29 @@ message DetachShardsResponse { Signature signature = 2; } + +message StartShardRebuildRequest { + message Body { + repeated bytes shard_ID = 1; + uint32 target_fill_percent = 2; + uint32 concurrency_limit = 3; + } + + Body body = 1; + Signature signature = 2; +} + +message StartShardRebuildResponse { + message Body { + message Status { + bytes shard_ID = 1; + bool success = 2; + string error = 3; + } + repeated Status results = 1; + } + + Body body = 1; + + Signature signature = 2; +} diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index eb0d95c64..019cac290 100644 --- a/pkg/services/control/service_frostfs.pb.go +++ b/pkg/services/control/service_frostfs.pb.go @@ -15023,3 +15023,921 @@ func (x *DetachShardsResponse) UnmarshalEasyJSON(in *jlexer.Lexer) { in.Consumed() } } + +type StartShardRebuildRequest_Body struct { + Shard_ID [][]byte `json:"shardID"` + TargetFillPercent uint32 `json:"targetFillPercent"` + ConcurrencyLimit uint32 `json:"concurrencyLimit"` +} + +var ( + _ encoding.ProtoMarshaler = (*StartShardRebuildRequest_Body)(nil) + _ encoding.ProtoUnmarshaler = (*StartShardRebuildRequest_Body)(nil) + _ json.Marshaler = (*StartShardRebuildRequest_Body)(nil) + _ json.Unmarshaler = (*StartShardRebuildRequest_Body)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *StartShardRebuildRequest_Body) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.RepeatedBytesSize(1, x.Shard_ID) + size += proto.UInt32Size(2, x.TargetFillPercent) + size += proto.UInt32Size(3, x.ConcurrencyLimit) + return size +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *StartShardRebuildRequest_Body) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *StartShardRebuildRequest_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + for j := range x.Shard_ID { + mm.AppendBytes(1, x.Shard_ID[j]) + } + if x.TargetFillPercent != 0 { + mm.AppendUint32(2, x.TargetFillPercent) + } + if x.ConcurrencyLimit != 0 { + mm.AppendUint32(3, x.ConcurrencyLimit) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *StartShardRebuildRequest_Body) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "StartShardRebuildRequest_Body") + } + switch fc.FieldNum { + case 1: // Shard_ID + data, ok := fc.Bytes() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Shard_ID") + } + x.Shard_ID = append(x.Shard_ID, data) + case 2: // TargetFillPercent + data, ok := fc.Uint32() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "TargetFillPercent") + } + x.TargetFillPercent = data + case 3: // ConcurrencyLimit + data, ok := fc.Uint32() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "ConcurrencyLimit") + } + x.ConcurrencyLimit = data + } + } + return nil +} +func (x *StartShardRebuildRequest_Body) GetShard_ID() [][]byte { + if x != nil { + return x.Shard_ID + } + return nil +} +func (x *StartShardRebuildRequest_Body) SetShard_ID(v [][]byte) { + x.Shard_ID = v +} +func (x *StartShardRebuildRequest_Body) GetTargetFillPercent() uint32 { + if x != nil { + return x.TargetFillPercent + } + return 0 +} +func (x *StartShardRebuildRequest_Body) SetTargetFillPercent(v uint32) { + x.TargetFillPercent = v +} +func (x *StartShardRebuildRequest_Body) GetConcurrencyLimit() uint32 { + if x != nil { + return x.ConcurrencyLimit + } + return 0 +} +func (x *StartShardRebuildRequest_Body) SetConcurrencyLimit(v uint32) { + x.ConcurrencyLimit = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *StartShardRebuildRequest_Body) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *StartShardRebuildRequest_Body) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + out.RawByte('{') + { + const prefix string = ",\"shardID\":" + out.RawString(prefix[1:]) + out.RawByte('[') + for i := range x.Shard_ID { + if i != 0 { + out.RawByte(',') + } + out.Base64Bytes(x.Shard_ID[i]) + } + out.RawByte(']') + } + { + const prefix string = ",\"targetFillPercent\":" + out.RawString(prefix) + out.Uint32(x.TargetFillPercent) + } + { + const prefix string = ",\"concurrencyLimit\":" + out.RawString(prefix) + out.Uint32(x.ConcurrencyLimit) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *StartShardRebuildRequest_Body) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *StartShardRebuildRequest_Body) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "shardID": + { + var f []byte + var list [][]byte + in.Delim('[') + for !in.IsDelim(']') { + f = in.Bytes() + list = append(list, f) + in.WantComma() + } + x.Shard_ID = list + in.Delim(']') + } + case "targetFillPercent": + { + var f uint32 + f = in.Uint32() + x.TargetFillPercent = f + } + case "concurrencyLimit": + { + var f uint32 + f = in.Uint32() + x.ConcurrencyLimit = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type StartShardRebuildRequest struct { + Body *StartShardRebuildRequest_Body `json:"body"` + Signature *Signature `json:"signature"` +} + +var ( + _ encoding.ProtoMarshaler = (*StartShardRebuildRequest)(nil) + _ encoding.ProtoUnmarshaler = (*StartShardRebuildRequest)(nil) + _ json.Marshaler = (*StartShardRebuildRequest)(nil) + _ json.Unmarshaler = (*StartShardRebuildRequest)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *StartShardRebuildRequest) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.NestedStructureSize(1, x.Body) + size += proto.NestedStructureSize(2, x.Signature) + return size +} + +// ReadSignedData fills buf with signed data of x. +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *StartShardRebuildRequest) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *StartShardRebuildRequest) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().MarshalProtobuf(buf), nil +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *StartShardRebuildRequest) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *StartShardRebuildRequest) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if x.Body != nil { + x.Body.EmitProtobuf(mm.AppendMessage(1)) + } + if x.Signature != nil { + x.Signature.EmitProtobuf(mm.AppendMessage(2)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *StartShardRebuildRequest) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "StartShardRebuildRequest") + } + switch fc.FieldNum { + case 1: // Body + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Body") + } + x.Body = new(StartShardRebuildRequest_Body) + if err := x.Body.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + case 2: // Signature + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Signature") + } + x.Signature = new(Signature) + if err := x.Signature.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *StartShardRebuildRequest) GetBody() *StartShardRebuildRequest_Body { + if x != nil { + return x.Body + } + return nil +} +func (x *StartShardRebuildRequest) SetBody(v *StartShardRebuildRequest_Body) { + x.Body = v +} +func (x *StartShardRebuildRequest) GetSignature() *Signature { + if x != nil { + return x.Signature + } + return nil +} +func (x *StartShardRebuildRequest) SetSignature(v *Signature) { + x.Signature = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *StartShardRebuildRequest) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *StartShardRebuildRequest) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + out.RawByte('{') + { + const prefix string = ",\"body\":" + out.RawString(prefix[1:]) + x.Body.MarshalEasyJSON(out) + } + { + const prefix string = ",\"signature\":" + out.RawString(prefix) + x.Signature.MarshalEasyJSON(out) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *StartShardRebuildRequest) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *StartShardRebuildRequest) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "body": + { + var f *StartShardRebuildRequest_Body + f = new(StartShardRebuildRequest_Body) + f.UnmarshalEasyJSON(in) + x.Body = f + } + case "signature": + { + var f *Signature + f = new(Signature) + f.UnmarshalEasyJSON(in) + x.Signature = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type StartShardRebuildResponse_Body_Status struct { + Shard_ID []byte `json:"shardID"` + Success bool `json:"success"` + Error string `json:"error"` +} + +var ( + _ encoding.ProtoMarshaler = (*StartShardRebuildResponse_Body_Status)(nil) + _ encoding.ProtoUnmarshaler = (*StartShardRebuildResponse_Body_Status)(nil) + _ json.Marshaler = (*StartShardRebuildResponse_Body_Status)(nil) + _ json.Unmarshaler = (*StartShardRebuildResponse_Body_Status)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *StartShardRebuildResponse_Body_Status) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.BytesSize(1, x.Shard_ID) + size += proto.BoolSize(2, x.Success) + size += proto.StringSize(3, x.Error) + return size +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *StartShardRebuildResponse_Body_Status) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *StartShardRebuildResponse_Body_Status) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if len(x.Shard_ID) != 0 { + mm.AppendBytes(1, x.Shard_ID) + } + if x.Success { + mm.AppendBool(2, x.Success) + } + if len(x.Error) != 0 { + mm.AppendString(3, x.Error) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *StartShardRebuildResponse_Body_Status) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "StartShardRebuildResponse_Body_Status") + } + switch fc.FieldNum { + case 1: // Shard_ID + data, ok := fc.Bytes() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Shard_ID") + } + x.Shard_ID = data + case 2: // Success + data, ok := fc.Bool() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Success") + } + x.Success = data + case 3: // Error + data, ok := fc.String() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Error") + } + x.Error = data + } + } + return nil +} +func (x *StartShardRebuildResponse_Body_Status) GetShard_ID() []byte { + if x != nil { + return x.Shard_ID + } + return nil +} +func (x *StartShardRebuildResponse_Body_Status) SetShard_ID(v []byte) { + x.Shard_ID = v +} +func (x *StartShardRebuildResponse_Body_Status) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} +func (x *StartShardRebuildResponse_Body_Status) SetSuccess(v bool) { + x.Success = v +} +func (x *StartShardRebuildResponse_Body_Status) GetError() string { + if x != nil { + return x.Error + } + return "" +} +func (x *StartShardRebuildResponse_Body_Status) SetError(v string) { + x.Error = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *StartShardRebuildResponse_Body_Status) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *StartShardRebuildResponse_Body_Status) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + out.RawByte('{') + { + const prefix string = ",\"shardID\":" + out.RawString(prefix[1:]) + out.Base64Bytes(x.Shard_ID) + } + { + const prefix string = ",\"success\":" + out.RawString(prefix) + out.Bool(x.Success) + } + { + const prefix string = ",\"error\":" + out.RawString(prefix) + out.String(x.Error) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *StartShardRebuildResponse_Body_Status) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *StartShardRebuildResponse_Body_Status) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "shardID": + { + var f []byte + f = in.Bytes() + x.Shard_ID = f + } + case "success": + { + var f bool + f = in.Bool() + x.Success = f + } + case "error": + { + var f string + f = in.String() + x.Error = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type StartShardRebuildResponse_Body struct { + Results []StartShardRebuildResponse_Body_Status `json:"results"` +} + +var ( + _ encoding.ProtoMarshaler = (*StartShardRebuildResponse_Body)(nil) + _ encoding.ProtoUnmarshaler = (*StartShardRebuildResponse_Body)(nil) + _ json.Marshaler = (*StartShardRebuildResponse_Body)(nil) + _ json.Unmarshaler = (*StartShardRebuildResponse_Body)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *StartShardRebuildResponse_Body) StableSize() (size int) { + if x == nil { + return 0 + } + for i := range x.Results { + size += proto.NestedStructureSizeUnchecked(1, &x.Results[i]) + } + return size +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *StartShardRebuildResponse_Body) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *StartShardRebuildResponse_Body) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + for i := range x.Results { + x.Results[i].EmitProtobuf(mm.AppendMessage(1)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *StartShardRebuildResponse_Body) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "StartShardRebuildResponse_Body") + } + switch fc.FieldNum { + case 1: // Results + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Results") + } + x.Results = append(x.Results, StartShardRebuildResponse_Body_Status{}) + ff := &x.Results[len(x.Results)-1] + if err := ff.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *StartShardRebuildResponse_Body) GetResults() []StartShardRebuildResponse_Body_Status { + if x != nil { + return x.Results + } + return nil +} +func (x *StartShardRebuildResponse_Body) SetResults(v []StartShardRebuildResponse_Body_Status) { + x.Results = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *StartShardRebuildResponse_Body) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *StartShardRebuildResponse_Body) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + out.RawByte('{') + { + const prefix string = ",\"results\":" + out.RawString(prefix[1:]) + out.RawByte('[') + for i := range x.Results { + if i != 0 { + out.RawByte(',') + } + x.Results[i].MarshalEasyJSON(out) + } + out.RawByte(']') + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *StartShardRebuildResponse_Body) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *StartShardRebuildResponse_Body) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "results": + { + var f StartShardRebuildResponse_Body_Status + var list []StartShardRebuildResponse_Body_Status + in.Delim('[') + for !in.IsDelim(']') { + f = StartShardRebuildResponse_Body_Status{} + f.UnmarshalEasyJSON(in) + list = append(list, f) + in.WantComma() + } + x.Results = list + in.Delim(']') + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} + +type StartShardRebuildResponse struct { + Body *StartShardRebuildResponse_Body `json:"body"` + Signature *Signature `json:"signature"` +} + +var ( + _ encoding.ProtoMarshaler = (*StartShardRebuildResponse)(nil) + _ encoding.ProtoUnmarshaler = (*StartShardRebuildResponse)(nil) + _ json.Marshaler = (*StartShardRebuildResponse)(nil) + _ json.Unmarshaler = (*StartShardRebuildResponse)(nil) +) + +// StableSize returns the size of x in protobuf format. +// +// Structures with the same field values have the same binary size. +func (x *StartShardRebuildResponse) StableSize() (size int) { + if x == nil { + return 0 + } + size += proto.NestedStructureSize(1, x.Body) + size += proto.NestedStructureSize(2, x.Signature) + return size +} + +// ReadSignedData fills buf with signed data of x. +// If buffer length is less than x.SignedDataSize(), new buffer is allocated. +// +// Returns any error encountered which did not allow writing the data completely. +// Otherwise, returns the buffer in which the data is written. +// +// Structures with the same field values have the same signed data. +func (x *StartShardRebuildResponse) SignedDataSize() int { + return x.GetBody().StableSize() +} + +// SignedDataSize returns size of the request signed data in bytes. +// +// Structures with the same field values have the same signed data size. +func (x *StartShardRebuildResponse) ReadSignedData(buf []byte) ([]byte, error) { + return x.GetBody().MarshalProtobuf(buf), nil +} + +// MarshalProtobuf implements the encoding.ProtoMarshaler interface. +func (x *StartShardRebuildResponse) MarshalProtobuf(dst []byte) []byte { + m := pool.MarshalerPool.Get() + defer pool.MarshalerPool.Put(m) + x.EmitProtobuf(m.MessageMarshaler()) + dst = m.Marshal(dst) + return dst +} + +func (x *StartShardRebuildResponse) EmitProtobuf(mm *easyproto.MessageMarshaler) { + if x == nil { + return + } + if x.Body != nil { + x.Body.EmitProtobuf(mm.AppendMessage(1)) + } + if x.Signature != nil { + x.Signature.EmitProtobuf(mm.AppendMessage(2)) + } +} + +// UnmarshalProtobuf implements the encoding.ProtoUnmarshaler interface. +func (x *StartShardRebuildResponse) UnmarshalProtobuf(src []byte) (err error) { + var fc easyproto.FieldContext + for len(src) > 0 { + src, err = fc.NextField(src) + if err != nil { + return fmt.Errorf("cannot read next field in %s", "StartShardRebuildResponse") + } + switch fc.FieldNum { + case 1: // Body + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Body") + } + x.Body = new(StartShardRebuildResponse_Body) + if err := x.Body.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + case 2: // Signature + data, ok := fc.MessageData() + if !ok { + return fmt.Errorf("cannot unmarshal field %s", "Signature") + } + x.Signature = new(Signature) + if err := x.Signature.UnmarshalProtobuf(data); err != nil { + return fmt.Errorf("unmarshal: %w", err) + } + } + } + return nil +} +func (x *StartShardRebuildResponse) GetBody() *StartShardRebuildResponse_Body { + if x != nil { + return x.Body + } + return nil +} +func (x *StartShardRebuildResponse) SetBody(v *StartShardRebuildResponse_Body) { + x.Body = v +} +func (x *StartShardRebuildResponse) GetSignature() *Signature { + if x != nil { + return x.Signature + } + return nil +} +func (x *StartShardRebuildResponse) SetSignature(v *Signature) { + x.Signature = v +} + +// MarshalJSON implements the json.Marshaler interface. +func (x *StartShardRebuildResponse) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + x.MarshalEasyJSON(&w) + return w.Buffer.BuildBytes(), w.Error +} +func (x *StartShardRebuildResponse) MarshalEasyJSON(out *jwriter.Writer) { + if x == nil { + out.RawString("null") + return + } + out.RawByte('{') + { + const prefix string = ",\"body\":" + out.RawString(prefix[1:]) + x.Body.MarshalEasyJSON(out) + } + { + const prefix string = ",\"signature\":" + out.RawString(prefix) + x.Signature.MarshalEasyJSON(out) + } + out.RawByte('}') +} + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (x *StartShardRebuildResponse) UnmarshalJSON(data []byte) error { + r := jlexer.Lexer{Data: data} + x.UnmarshalEasyJSON(&r) + return r.Error() +} +func (x *StartShardRebuildResponse) UnmarshalEasyJSON(in *jlexer.Lexer) { + isTopLevel := in.IsStart() + if in.IsNull() { + if isTopLevel { + in.Consumed() + } + in.Skip() + return + } + in.Delim('{') + for !in.IsDelim('}') { + key := in.UnsafeFieldName(false) + in.WantColon() + if in.IsNull() { + in.Skip() + in.WantComma() + continue + } + switch key { + case "body": + { + var f *StartShardRebuildResponse_Body + f = new(StartShardRebuildResponse_Body) + f.UnmarshalEasyJSON(in) + x.Body = f + } + case "signature": + { + var f *Signature + f = new(Signature) + f.UnmarshalEasyJSON(in) + x.Signature = f + } + } + in.WantComma() + } + in.Delim('}') + if isTopLevel { + in.Consumed() + } +} diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index fa9de974a..f5cfefa85 100644 --- a/pkg/services/control/service_grpc.pb.go +++ b/pkg/services/control/service_grpc.pb.go @@ -41,6 +41,7 @@ const ( ControlService_ListTargetsLocalOverrides_FullMethodName = "/control.ControlService/ListTargetsLocalOverrides" ControlService_SealWriteCache_FullMethodName = "/control.ControlService/SealWriteCache" ControlService_DetachShards_FullMethodName = "/control.ControlService/DetachShards" + ControlService_StartShardRebuild_FullMethodName = "/control.ControlService/StartShardRebuild" ) // ControlServiceClient is the client API for ControlService service. @@ -97,6 +98,8 @@ type ControlServiceClient interface { SealWriteCache(ctx context.Context, in *SealWriteCacheRequest, opts ...grpc.CallOption) (*SealWriteCacheResponse, error) // DetachShards detaches and closes shards. DetachShards(ctx context.Context, in *DetachShardsRequest, opts ...grpc.CallOption) (*DetachShardsResponse, error) + // StartShardRebuild starts shard rebuild process. + StartShardRebuild(ctx context.Context, in *StartShardRebuildRequest, opts ...grpc.CallOption) (*StartShardRebuildResponse, error) } type controlServiceClient struct { @@ -305,6 +308,15 @@ func (c *controlServiceClient) DetachShards(ctx context.Context, in *DetachShard return out, nil } +func (c *controlServiceClient) StartShardRebuild(ctx context.Context, in *StartShardRebuildRequest, opts ...grpc.CallOption) (*StartShardRebuildResponse, error) { + out := new(StartShardRebuildResponse) + err := c.cc.Invoke(ctx, ControlService_StartShardRebuild_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ControlServiceServer is the server API for ControlService service. // All implementations should embed UnimplementedControlServiceServer // for forward compatibility @@ -359,6 +371,8 @@ type ControlServiceServer interface { SealWriteCache(context.Context, *SealWriteCacheRequest) (*SealWriteCacheResponse, error) // DetachShards detaches and closes shards. DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error) + // StartShardRebuild starts shard rebuild process. + StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error) } // UnimplementedControlServiceServer should be embedded to have forward compatible implementations. @@ -431,6 +445,9 @@ func (UnimplementedControlServiceServer) SealWriteCache(context.Context, *SealWr func (UnimplementedControlServiceServer) DetachShards(context.Context, *DetachShardsRequest) (*DetachShardsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method DetachShards not implemented") } +func (UnimplementedControlServiceServer) StartShardRebuild(context.Context, *StartShardRebuildRequest) (*StartShardRebuildResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method StartShardRebuild not implemented") +} // UnsafeControlServiceServer may be embedded to opt out of forward compatibility for this service. // Use of this interface is not recommended, as added methods to ControlServiceServer will @@ -839,6 +856,24 @@ func _ControlService_DetachShards_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _ControlService_StartShardRebuild_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StartShardRebuildRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ControlServiceServer).StartShardRebuild(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ControlService_StartShardRebuild_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ControlServiceServer).StartShardRebuild(ctx, req.(*StartShardRebuildRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ControlService_ServiceDesc is the grpc.ServiceDesc for ControlService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -934,6 +969,10 @@ var ControlService_ServiceDesc = grpc.ServiceDesc{ MethodName: "DetachShards", Handler: _ControlService_DetachShards_Handler, }, + { + MethodName: "StartShardRebuild", + Handler: _ControlService_StartShardRebuild_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "pkg/services/control/service.proto",