diff --git a/cmd/frostfs-cli/modules/control/rebuild_shards.go b/cmd/frostfs-cli/modules/control/rebuild_shards.go new file mode 100644 index 000000000..e2b408712 --- /dev/null +++ b/cmd/frostfs-cli/modules/control/rebuild_shards.go @@ -0,0 +1,88 @@ +package control + +import ( + "fmt" + + rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/mr-tron/base58" + "github.com/spf13/cobra" +) + +const ( + fillPercentFlag = "fill_percent" +) + +var shardsRebuildCmd = &cobra.Command{ + Use: "rebuild", + Short: "Rebuild shards", + Long: "Rebuild reclaims storage occupied by dead objects and adjusts the storage structure according to the configuration (for blobovnicza only now)", + Run: shardsRebuild, +} + +func shardsRebuild(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + req := &control.StartShardRebuildRequest{ + Body: &control.StartShardRebuildRequest_Body{ + Shard_ID: getShardIDList(cmd), + TargetFillPercent: getFillPercentValue(cmd), + ConcurrencyLimit: getConcurrencyValue(cmd), + }, + } + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.StartShardRebuildResponse + var err error + err = cli.ExecRaw(func(client *rawclient.Client) error { + resp, err = control.StartShardRebuild(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + var success, failed uint + for _, res := range resp.GetBody().GetResults() { + if res.GetSuccess() { + success++ + cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID())) + } else { + failed++ + cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError()) + } + } + cmd.Printf("Total: %d success, %d failed\n", success, failed) +} + +func getFillPercentValue(cmd *cobra.Command) uint32 { + v, _ := cmd.Flags().GetUint32(fillPercentFlag) + if v <= 0 || v > 100 { + commonCmd.ExitOnErr(cmd, "invalid fill_percent value", fmt.Errorf("fill_percent value must be (0, 100], current value: %d", v)) + } + return v +} + +func getConcurrencyValue(cmd *cobra.Command) uint32 { + v, _ := cmd.Flags().GetUint32(concurrencyFlag) + if v <= 0 || v > 10000 { + commonCmd.ExitOnErr(cmd, "invalid concurrency value", fmt.Errorf("concurrency value must be (0, 10 000], current value: %d", v)) + } + return v +} + +func initControlShardRebuildCmd() { + initControlFlags(shardsRebuildCmd) + + flags := shardsRebuildCmd.Flags() + flags.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") + flags.Bool(shardAllFlag, false, "Process all shards") + flags.Uint32(fillPercentFlag, 80, "Target fill percent to reclaim space") + flags.Uint32(concurrencyFlag, 20, "Maximum count of concurrently rebuilding files") + setShardModeCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) +} diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index d8198c426..d6c2a0b9b 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -28,4 +28,5 @@ func initControlShardsCmd() { initControlDoctorCmd() initControlShardsWritecacheCmd() initControlShardsDetachCmd() + initControlShardRebuildCmd() } diff --git a/pkg/local_object_storage/engine/rebuild.go b/pkg/local_object_storage/engine/rebuild.go new file mode 100644 index 000000000..3970aae89 --- /dev/null +++ b/pkg/local_object_storage/engine/rebuild.go @@ -0,0 +1,90 @@ +package engine + +import ( + "context" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" +) + +type RebuildPrm struct { + ShardIDs []*shard.ID + ConcurrencyLimit uint32 + TargetFillPercent uint32 +} + +type ShardRebuildResult struct { + ShardID *shard.ID + Success bool + ErrorMsg string +} + +type RebuildRes struct { + ShardResults []ShardRebuildResult +} + +func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Rebuild", + trace.WithAttributes( + attribute.Int("shard_id_count", len(prm.ShardIDs)), + attribute.Int64("target_fill_percent", int64(prm.TargetFillPercent)), + attribute.Int64("concurrency_limit", int64(prm.ConcurrencyLimit)), + )) + defer span.End() + + res := RebuildRes{ + ShardResults: make([]ShardRebuildResult, 0, len(prm.ShardIDs)), + } + resGuard := &sync.Mutex{} + + limiter := newRebuildLimiter(prm.ConcurrencyLimit) + + eg, egCtx := errgroup.WithContext(ctx) + for _, shardID := range prm.ShardIDs { + eg.Go(func() error { + e.mtx.RLock() + sh, ok := e.shards[shardID.String()] + e.mtx.RUnlock() + + if !ok { + resGuard.Lock() + defer resGuard.Unlock() + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + ErrorMsg: errShardNotFound.Error(), + }) + return nil + } + + err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{ + ConcurrencyLimiter: limiter, + TargetFillPercent: prm.TargetFillPercent, + }) + + resGuard.Lock() + defer resGuard.Unlock() + + if err != nil { + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + ErrorMsg: err.Error(), + }) + } else { + res.ShardResults = append(res.ShardResults, ShardRebuildResult{ + ShardID: shardID, + Success: true, + }) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return RebuildRes{}, err + } + return res, nil +} diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 998fcf08b..f8051999e 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -10,7 +10,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -171,3 +174,33 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres _, err := u.mb.UpdateStorageID(ctx, prm) return err } + +type RebuildPrm struct { + ConcurrencyLimiter RebuildWorkerLimiter + TargetFillPercent uint32 +} + +func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ScheduleRebuild", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Int64("target_fill_percent", int64(p.TargetFillPercent)), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{ + SchemaChange: true, + FillPercent: true, + FillPercentValue: int(p.TargetFillPercent), + }) +} diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index a90e58a65..80aece008 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -30,6 +30,7 @@ const ( rpcSealWriteCache = "SealWriteCache" rpcListTargetsLocalOverrides = "ListTargetsLocalOverrides" rpcDetachShards = "DetachShards" + rpcStartShardRebuild = "StartShardRebuild" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -361,3 +362,16 @@ func DetachShards( return wResp.message, nil } + +// StartShardRebuild executes ControlService.StartShardRebuild RPC. +func StartShardRebuild(cli *client.Client, req *StartShardRebuildRequest, opts ...client.CallOption) (*StartShardRebuildResponse, error) { + wResp := newResponseWrapper[StartShardRebuildResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcStartShardRebuild), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} diff --git a/pkg/services/control/server/rebuild.go b/pkg/services/control/server/rebuild.go new file mode 100644 index 000000000..6ddfb8bf4 --- /dev/null +++ b/pkg/services/control/server/rebuild.go @@ -0,0 +1,59 @@ +package control + +import ( + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ctrlmessage" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) StartShardRebuild(ctx context.Context, req *control.StartShardRebuildRequest) (*control.StartShardRebuildResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + if req.GetBody().GetConcurrencyLimit() == 0 || req.GetBody().GetConcurrencyLimit() > 10000 { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("concurrency limit must be in range (0; 10 000], current value %d", req.GetBody().GetConcurrencyLimit())) + } + + if req.GetBody().GetTargetFillPercent() == 0 || req.GetBody().GetTargetFillPercent() > 100 { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("fill percent must be in range (0; 100], current value %d", req.GetBody().GetTargetFillPercent())) + } + + prm := engine.RebuildPrm{ + ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), + ConcurrencyLimit: req.GetBody().GetConcurrencyLimit(), + TargetFillPercent: req.GetBody().GetTargetFillPercent(), + } + + res, err := s.s.Rebuild(ctx, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.StartShardRebuildResponse{Body: &control.StartShardRebuildResponse_Body{}} + for _, r := range res.ShardResults { + if r.Success { + resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{ + Shard_ID: *r.ShardID, + Success: true, + }) + } else { + resp.Body.Results = append(resp.GetBody().GetResults(), control.StartShardRebuildResponse_Body_Status{ + Shard_ID: *r.ShardID, + Error: r.ErrorMsg, + }) + } + } + + err = ctrlmessage.Sign(s.key, resp) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index d6639cb48..04994328a 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -91,6 +91,9 @@ service ControlService { // DetachShards detaches and closes shards. rpc DetachShards(DetachShardsRequest) returns (DetachShardsResponse); + + // StartShardRebuild starts shard rebuild process. + rpc StartShardRebuild(StartShardRebuildRequest) returns (StartShardRebuildResponse); } // Health check request. @@ -699,3 +702,29 @@ message DetachShardsResponse { Signature signature = 2; } + +message StartShardRebuildRequest { + message Body { + repeated bytes shard_ID = 1; + uint32 target_fill_percent = 2; + uint32 concurrency_limit = 3; + } + + Body body = 1; + Signature signature = 2; +} + +message StartShardRebuildResponse { + message Body { + message Status { + bytes shard_ID = 1; + bool success = 2; + string error = 3; + } + repeated Status results = 1; + } + + Body body = 1; + + Signature signature = 2; +} diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index eb0d95c64..019cac290 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index fa9de974a..f5cfefa85 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ