forked from TrueCloudLab/frostfs-node
[#1337] cli: Add control shards rebuild
command
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
d508da8397
commit
6b6eabe41c
9 changed files with 314 additions and 0 deletions
88
cmd/frostfs-cli/modules/control/rebuild_shards.go
Normal file
88
cmd/frostfs-cli/modules/control/rebuild_shards.go
Normal file
|
@ -0,0 +1,88 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
fillPercentFlag = "fill_percent"
|
||||
)
|
||||
|
||||
var shardsRebuildCmd = &cobra.Command{
|
||||
Use: "rebuild",
|
||||
Short: "Rebuild shards",
|
||||
Long: "Rebuild reclaims storage occupied by dead objects and adjusts the storage structure according to the configuration (for blobovnicza only now)",
|
||||
Run: shardsRebuild,
|
||||
}
|
||||
|
||||
func shardsRebuild(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
req := &control.StartShardRebuildRequest{
|
||||
Body: &control.StartShardRebuildRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
TargetFillPercent: getFillPercentValue(cmd),
|
||||
ConcurrencyLimit: getConcurrencyValue(cmd),
|
||||
},
|
||||
}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.StartShardRebuildResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *rawclient.Client) error {
|
||||
resp, err = control.StartShardRebuild(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
var success, failed uint
|
||||
for _, res := range resp.GetBody().GetResults() {
|
||||
if res.GetSuccess() {
|
||||
success++
|
||||
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
|
||||
} else {
|
||||
failed++
|
||||
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
|
||||
}
|
||||
}
|
||||
cmd.Printf("Total: %d success, %d failed\n", success, failed)
|
||||
}
|
||||
|
||||
func getFillPercentValue(cmd *cobra.Command) uint32 {
|
||||
v, _ := cmd.Flags().GetUint32(fillPercentFlag)
|
||||
if v <= 0 || v > 100 {
|
||||
commonCmd.ExitOnErr(cmd, "invalid fill_percent value", fmt.Errorf("fill_percent value must be (0, 100], current value: %d", v))
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func getConcurrencyValue(cmd *cobra.Command) uint32 {
|
||||
v, _ := cmd.Flags().GetUint32(concurrencyFlag)
|
||||
if v <= 0 || v > 10000 {
|
||||
commonCmd.ExitOnErr(cmd, "invalid concurrency value", fmt.Errorf("concurrency value must be (0, 10 000], current value: %d", v))
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func initControlShardRebuildCmd() {
|
||||
initControlFlags(shardsRebuildCmd)
|
||||
|
||||
flags := shardsRebuildCmd.Flags()
|
||||
flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
flags.Bool(shardAllFlag, false, "Process all shards")
|
||||
flags.Uint32(fillPercentFlag, 80, "Target fill percent to reclaim space")
|
||||
flags.Uint32(concurrencyFlag, 20, "Maximum count of concurrently rebuilding files")
|
||||
setShardModeCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
|
@ -28,4 +28,5 @@ func initControlShardsCmd() {
|
|||
initControlDoctorCmd()
|
||||
initControlShardsWritecacheCmd()
|
||||
initControlShardsDetachCmd()
|
||||
initControlShardRebuildCmd()
|
||||
}
|
||||
|
|
90
pkg/local_object_storage/engine/rebuild.go
Normal file
90
pkg/local_object_storage/engine/rebuild.go
Normal file
|
@ -0,0 +1,90 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type RebuildPrm struct {
|
||||
ShardIDs []*shard.ID
|
||||
ConcurrencyLimit uint32
|
||||
TargetFillPercent uint32
|
||||
}
|
||||
|
||||
type ShardRebuildResult struct {
|
||||
ShardID *shard.ID
|
||||
Success bool
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
type RebuildRes struct {
|
||||
ShardResults []ShardRebuildResult
|
||||
}
|
||||
|
||||
func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Rebuild",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("shard_id_count", len(prm.ShardIDs)),
|
||||
attribute.Int64("target_fill_percent", int64(prm.TargetFillPercent)),
|
||||
attribute.Int64("concurrency_limit", int64(prm.ConcurrencyLimit)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
res := RebuildRes{
|
||||
ShardResults: make([]ShardRebuildResult, 0, len(prm.ShardIDs)),
|
||||
}
|
||||
resGuard := &sync.Mutex{}
|
||||
|
||||
limiter := newRebuildLimiter(prm.ConcurrencyLimit)
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for _, shardID := range prm.ShardIDs {
|
||||
eg.Go(func() error {
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[shardID.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: errShardNotFound.Error(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
|
||||
ConcurrencyLimiter: limiter,
|
||||
TargetFillPercent: prm.TargetFillPercent,
|
||||
})
|
||||
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
|
||||
if err != nil {
|
||||
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: err.Error(),
|
||||
})
|
||||
} else {
|
||||
res.ShardResults = append(res.ShardResults, ShardRebuildResult{
|
||||
ShardID: shardID,
|
||||
Success: true,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return RebuildRes{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
|
@ -10,7 +10,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -171,3 +174,33 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
|
|||
_, err := u.mb.UpdateStorageID(ctx, prm)
|
||||
return err
|
||||
}
|
||||
|
||||
type RebuildPrm struct {
|
||||
ConcurrencyLimiter RebuildWorkerLimiter
|
||||
TargetFillPercent uint32
|
||||
}
|
||||
|
||||
func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ScheduleRebuild",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Int64("target_fill_percent", int64(p.TargetFillPercent)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{
|
||||
SchemaChange: true,
|
||||
FillPercent: true,
|
||||
FillPercentValue: int(p.TargetFillPercent),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ const (
|
|||
rpcSealWriteCache = "SealWriteCache"
|
||||
rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides"
|
||||
rpcDetachShards = "DetachShards"
|
||||
rpcStartShardRebuild = "StartShardRebuild"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -361,3 +362,16 @@ func DetachShards(
|
|||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// StartShardRebuild executes ControlService.StartShardRebuild RPC.
|
||||
func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts ...client.CallOption) (*StartShardRebuildResponse, error) {
|
||||
wResp := newResponseWrapper[StartShardRebuildResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartShardRebuild), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
|
59
pkg/services/control/server/rebuild.go
Normal file
59
pkg/services/control/server/rebuild.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) StartShardRebuild(ctx context.Context, req *control.StartShardRebuildRequest) (*control.StartShardRebuildResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
if req.GetBody().GetConcurrencyLimit() == 0 || req.GetBody().GetConcurrencyLimit() > 10000 {
|
||||
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("concurrency limit must be in range (0; 10 000], current value %d", req.GetBody().GetConcurrencyLimit()))
|
||||
}
|
||||
|
||||
if req.GetBody().GetTargetFillPercent() == 0 || req.GetBody().GetTargetFillPercent() > 100 {
|
||||
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("fill percent must be in range (0; 100], current value %d", req.GetBody().GetTargetFillPercent()))
|
||||
}
|
||||
|
||||
prm := engine.RebuildPrm{
|
||||
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
ConcurrencyLimit: req.GetBody().GetConcurrencyLimit(),
|
||||
TargetFillPercent: req.GetBody().GetTargetFillPercent(),
|
||||
}
|
||||
|
||||
res, err := s.s.Rebuild(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.StartShardRebuildResponse{Body: &control.StartShardRebuildResponse_Body{}}
|
||||
for _, r := range res.ShardResults {
|
||||
if r.Success {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Success: true,
|
||||
})
|
||||
} else {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Error: r.ErrorMsg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
err = ctrlmessage.Sign(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
|
@ -91,6 +91,9 @@ service ControlService {
|
|||
|
||||
// DetachShards detaches and closes shards.
|
||||
rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse);
|
||||
|
||||
// StartShardRebuild starts shard rebuild process.
|
||||
rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -699,3 +702,29 @@ message DetachShardsResponse {
|
|||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message StartShardRebuildRequest {
|
||||
message Body {
|
||||
repeated bytes shard_ID = 1;
|
||||
uint32 target_fill_percent = 2;
|
||||
uint32 concurrency_limit = 3;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message StartShardRebuildResponse {
|
||||
message Body {
|
||||
message Status {
|
||||
bytes shard_ID = 1;
|
||||
bool success = 2;
|
||||
string error = 3;
|
||||
}
|
||||
repeated Status results = 1;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue