[#1337] cli: Add control shards rebuild command

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-29 12:11:19 +03:00
parent 32a32e7725
commit 96e6817ebc
9 changed files with 314 additions and 0 deletions

View file

@ -0,0 +1,88 @@
package control
import (
"fmt"
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"github.com/spf13/cobra"
)
const (
fillPercentFlag = "fill_percent"
)
var shardsRebuildCmd = &cobra.Command{
Use: "rebuild",
Short: "Rebuild shards",
Long: "Rebuild reclaims storage occupied by dead objects",
Run: shardsRebuild,
}
func shardsRebuild(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
req := &control.StartShardRebuildRequest{
Body: &control.StartShardRebuildRequest_Body{
Shard_ID: getShardIDList(cmd),
TargetFillPercent: getFillPercentValue(cmd),
ConcurrencyLimit: getConcurrencyValue(cmd),
},
}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.StartShardRebuildResponse
var err error
err = cli.ExecRaw(func(client *rawclient.Client) error {
resp, err = control.StartShardRebuild(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
var success, failed uint
for _, res := range resp.GetBody().GetResults() {
if res.GetSuccess() {
success++
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
} else {
failed++
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
}
}
cmd.Printf("Total: %d success, %d failed\n", success, failed)
}
func getFillPercentValue(cmd *cobra.Command) uint32 {
v, _ := cmd.Flags().GetUint32(fillPercentFlag)
if v <= 0 || v > 100 {
commonCmd.ExitOnErr(cmd, "invalid fill_percent value", fmt.Errorf("fill_percent value must be (0, 100], current value: %d", v))
}
return v
}
func getConcurrencyValue(cmd *cobra.Command) uint32 {
v, _ := cmd.Flags().GetUint32(concurrencyFlag)
if v <= 0 || v > 10000 {
commonCmd.ExitOnErr(cmd, "invalid concurrency value", fmt.Errorf("concurrency value must be (0, 10 000], current value: %d", v))
}
return v
}
func initControlShardRebuildCmd() {
initControlFlags(shardsRebuildCmd)
flags := shardsRebuildCmd.Flags()
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
flags.Bool(shardAllFlag, false, "Process all shards")
flags.Uint32(fillPercentFlag, 80, "Target fill percent to reclaim space")
flags.Uint32(concurrencyFlag, 20, "Maximum count of concurrently rebuilding files")
setShardModeCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -28,4 +28,5 @@ func initControlShardsCmd() {
initControlDoctorCmd()
initControlShardsWritecacheCmd()
initControlShardsDetachCmd()
initControlShardRebuildCmd()
}

View file

@ -0,0 +1,90 @@
package engine
import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
)
type RebuildPrm struct {
ShardIDs []*shard.ID
ConcurrencyLimit uint32
TargetFillPercent uint32
}
type ShardRebuildResult struct {
ShardID *shard.ID
Success bool
ErrorMsg string
}
type RebuildRes struct {
ShardResults []ShardRebuildResult
}
func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Rebuild",
trace.WithAttributes(
attribute.Int("shard_id_count", len(prm.ShardIDs)),
attribute.Int64("target_fill_percent", int64(prm.TargetFillPercent)),
attribute.Int64("concurrency_limit", int64(prm.ConcurrencyLimit)),
))
defer span.End()
res := RebuildRes{
ShardResults: make([]ShardRebuildResult, 0, len(prm.ShardIDs)),
}
resGuard := &sync.Mutex{}
limiter := newRebuildLimiter(prm.ConcurrencyLimit)
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
eg.Go(func() error {
e.mtx.RLock()
sh, ok := e.shards[shardID.String()]
e.mtx.RUnlock()
if !ok {
resGuard.Lock()
defer resGuard.Unlock()
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
ShardID: shardID,
ErrorMsg: errShardNotFound.Error(),
})
return nil
}
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
ConcurrencyLimiter: limiter,
TargetFillPercent: prm.TargetFillPercent,
})
resGuard.Lock()
defer resGuard.Unlock()
if err != nil {
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
ShardID: shardID,
ErrorMsg: err.Error(),
})
} else {
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
ShardID: shardID,
Success: true,
})
}
return nil
})
}
if err := eg.Wait(); err != nil {
return RebuildRes{}, err
}
return res, nil
}

View file

@ -10,7 +10,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
@ -171,3 +174,33 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}
type RebuildPrm struct {
ConcurrencyLimiter RebuildWorkerLimiter
TargetFillPercent uint32
}
func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ScheduleRebuild",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Int64("target_fill_percent", int64(p.TargetFillPercent)),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{
SchemaChange: true,
FillPercent: true,
FillPercentValue: int(p.TargetFillPercent),
})
}

View file

@ -30,6 +30,7 @@ const (
rpcSealWriteCache = "SealWriteCache"
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
rpcDetachShards = "DetachShards"
rpcStartShardRebuild = "StartShardRebuild"
)
// HealthCheck executes ControlService.HealthCheck RPC.
@ -361,3 +362,16 @@ func DetachShards(
return wResp.message, nil
}
// StartShardRebuild executes ControlService.StartShardRebuild RPC.
func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts ...client.CallOption) (*StartShardRebuildResponse, error) {
wResp := newResponseWrapper[StartShardRebuildResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartShardRebuild), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}

View file

@ -0,0 +1,59 @@
package control
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) StartShardRebuild(ctx context.Context, req *control.StartShardRebuildRequest) (*control.StartShardRebuildResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
if req.GetBody().GetConcurrencyLimit() == 0 || req.GetBody().GetConcurrencyLimit() > 10000 {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("concurrency limit must be in range (0; 10 000], current value %d", req.GetBody().GetConcurrencyLimit()))
}
if req.GetBody().GetTargetFillPercent() == 0 || req.GetBody().GetTargetFillPercent() > 100 {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("fill percent must be in range (0; 100], current value %d", req.GetBody().GetTargetFillPercent()))
}
prm := engine.RebuildPrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
ConcurrencyLimit: req.GetBody().GetConcurrencyLimit(),
TargetFillPercent: req.GetBody().GetTargetFillPercent(),
}
res, err := s.s.Rebuild(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.StartShardRebuildResponse{Body: &control.StartShardRebuildResponse_Body{}}
for _, r := range res.ShardResults {
if r.Success {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.StartShardRebuildResponse_Body_Status{
Shard_ID: *r.ShardID,
Success: true,
})
} else {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.StartShardRebuildResponse_Body_Status{
Shard_ID: *r.ShardID,
Error: r.ErrorMsg,
})
}
}
err = ctrlmessage.Sign(s.key, resp)
if err != nil {
return nil, err
}
return resp, nil
}

View file

@ -91,6 +91,9 @@ service ControlService {
// DetachShards detaches and closes shards.
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
// StartShardRebuild starts shard rebuild process.
rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse);
}
// Health check request.
@ -699,3 +702,29 @@ message DetachShardsResponse {
Signature signature = 2;
}
message StartShardRebuildRequest {
message Body {
repeated bytes shard_ID = 1;
uint32 target_fill_percent = 2;
uint32 concurrency_limit = 3;
}
Body body = 1;
Signature signature = 2;
}
message StartShardRebuildResponse {
message Body {
message Status {
bytes shard_ID = 1;
bool success = 2;
string error = 3;
}
repeated Status results = 1;
}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.