[#569] cli: Add control shards writecache seal command

It does the same as `control shards flush-writecache --seal`, but
has better name.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-12-27 14:37:22 +03:00
parent 7a9db5bcdd
commit 581887148a
13 changed files with 309 additions and 5 deletions

View file

@ -15,6 +15,7 @@ var flushCacheCmd = &cobra.Command{
Short: "Flush objects from the write-cache to the main storage", Short: "Flush objects from the write-cache to the main storage",
Long: "Flush objects from the write-cache to the main storage", Long: "Flush objects from the write-cache to the main storage",
Run: flushCache, Run: flushCache,
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
} }
func flushCache(cmd *cobra.Command, _ []string) { func flushCache(cmd *cobra.Command, _ []string) {

View file

@ -17,6 +17,7 @@ func initControlShardsCmd() {
shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(evacuationShardCmd)
shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(flushCacheCmd)
shardsCmd.AddCommand(doctorCmd) shardsCmd.AddCommand(doctorCmd)
shardsCmd.AddCommand(writecacheShardCmd)
initControlShardsListCmd() initControlShardsListCmd()
initControlSetShardModeCmd() initControlSetShardModeCmd()
@ -24,4 +25,5 @@ func initControlShardsCmd() {
initControlEvacuationShardCmd() initControlEvacuationShardCmd()
initControlFlushCacheCmd() initControlFlushCacheCmd()
initControlDoctorCmd() initControlDoctorCmd()
initControlShardsWritecacheCmd()
} }

View file

@ -0,0 +1,73 @@
package control
import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"github.com/mr-tron/base58"
"github.com/spf13/cobra"
)
var writecacheShardCmd = &cobra.Command{
Use: "writecache",
Short: "Operations with storage node's write-cache",
Long: "Operations with storage node's write-cache",
}
var sealWritecacheShardCmd = &cobra.Command{
Use: "seal",
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
Run: sealWritecache,
}
func sealWritecache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors,
}}
signRequest(cmd, pk, req)
cli := getClient(cmd, pk)
var resp *control.SealWriteCacheResponse
var err error
err = cli.ExecRaw(func(client *client.Client) error {
resp, err = control.SealWriteCache(client, req)
return err
})
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
var success, failed uint
for _, res := range resp.GetBody().GetResults() {
if res.GetSuccess() {
success++
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
} else {
failed++
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
}
}
cmd.Printf("Total: %d success, %d failed\n", success, failed)
}
func initControlShardsWritecacheCmd() {
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
initControlFlags(sealWritecacheShardCmd)
ff := sealWritecacheShardCmd.Flags()
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
}

View file

@ -2,6 +2,7 @@ package engine
import ( import (
"context" "context"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
@ -11,6 +12,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
) )
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation. // FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
@ -44,7 +46,7 @@ type FlushWriteCacheRes struct{}
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) { func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
trace.WithAttributes( trace.WithAttributes(
attribute.String("shard)id", p.shardID.String()), attribute.String("shard_id", p.shardID.String()),
attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("ignore_errors", p.ignoreErrors),
attribute.Bool("seal", p.seal), attribute.Bool("seal", p.seal),
)) ))
@ -65,6 +67,79 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
} }
type SealWriteCachePrm struct {
ShardIDs []*shard.ID
IgnoreErrors bool
}
type ShardSealResult struct {
ShardID *shard.ID
Success bool
ErrorMsg string
}
type SealWriteCacheRes struct {
ShardResults []ShardSealResult
}
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
trace.WithAttributes(
attribute.Int("shard_id_count", len(prm.ShardIDs)),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
res := SealWriteCacheRes{
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
}
resGuard := &sync.Mutex{}
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
shardID := shardID
eg.Go(func() error {
e.mtx.RLock()
sh, ok := e.shards[shardID.String()]
e.mtx.RUnlock()
if !ok {
resGuard.Lock()
defer resGuard.Unlock()
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: errShardNotFound.Error(),
})
return nil
}
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
resGuard.Lock()
defer resGuard.Unlock()
if err != nil {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
ErrorMsg: err.Error(),
})
} else {
res.ShardResults = append(res.ShardResults, ShardSealResult{
ShardID: shardID,
Success: true,
})
}
return nil
})
}
if err := eg.Wait(); err != nil {
return SealWriteCacheRes{}, err
}
return res, nil
}
type writeCacheMetrics struct { type writeCacheMetrics struct {
shardID string shardID string
metrics metrics.WriteCacheMetrics metrics metrics.WriteCacheMetrics

View file

@ -56,3 +56,33 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal) return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
} }
type SealWriteCachePrm struct {
IgnoreErrors bool
}
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.Bool("ignore_errors", p.IgnoreErrors),
))
defer span.End()
if !s.hasWriteCache() {
return errWriteCacheDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.writeCache.Seal(ctx, p.IgnoreErrors)
}

View file

@ -0,0 +1,28 @@
package writecache
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
trace.WithAttributes(
attribute.Bool("ignore_errors", ignoreErrors),
))
defer span.End()
c.modeMtx.Lock()
defer c.modeMtx.Unlock()
// flush will be done by setMode
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
if err == nil {
c.metrics.SetMode(mode.DegradedReadOnly)
}
return err
}

View file

@ -36,6 +36,7 @@ type Cache interface {
SetLogger(*logger.Logger) SetLogger(*logger.Logger)
DumpInfo() Info DumpInfo() Info
Flush(context.Context, bool, bool) error Flush(context.Context, bool, bool) error
Seal(context.Context, bool) error
Init() error Init() error
Open(ctx context.Context, readOnly bool) error Open(ctx context.Context, readOnly bool) error

View file

@ -24,6 +24,7 @@ const (
rpcGetChainLocalOverride = "GetChainLocalOverride" rpcGetChainLocalOverride = "GetChainLocalOverride"
rpcListChainLocalOverrides = "ListChainLocalOverrides" rpcListChainLocalOverrides = "ListChainLocalOverrides"
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride" rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
rpcSealWriteCache = "SealWriteCache"
) )
// HealthCheck executes ControlService.HealthCheck RPC. // HealthCheck executes ControlService.HealthCheck RPC.
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
return wResp.message, nil return wResp.message, nil
} }
// SealWriteCache executes ControlService.SealWriteCache RPC.
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
wResp := newResponseWrapper[SealWriteCacheResponse]()
wReq := &requestWrapper{m: req}
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
if err != nil {
return nil, err
}
return wResp.message, nil
}

View file

@ -0,0 +1,48 @@
package control
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
err := s.isValidRequest(req)
if err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
prm := engine.SealWriteCachePrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
}
res, err := s.s.SealWriteCache(ctx, prm)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
for _, r := range res.ShardResults {
if r.Success {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Success: true,
})
} else {
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
Shard_ID: *r.ShardID,
Error: r.ErrorMsg,
})
}
}
err = SignMessage(s.key, resp)
if err != nil {
return nil, err
}
return resp, nil
}

Binary file not shown.

View file

@ -56,6 +56,9 @@ service ControlService {
// Remove local access policy engine overrides stored in the node by chaind id. // Remove local access policy engine overrides stored in the node by chaind id.
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse); rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
// Flush objects from write-cache and move it to degraded read only mode.
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
} }
// Health check request. // Health check request.
@ -525,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
Signature signature = 2; Signature signature = 2;
} }
message SealWriteCacheRequest {
// Request body structure.
message Body {
// ID of the shard.
repeated bytes shard_ID = 1;
// Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2;
}
Body body = 1;
Signature signature = 2;
}
message SealWriteCacheResponse {
message Body {
message Status {
bytes shard_ID = 1;
bool success = 2;
string error = 3;
}
repeated Status results = 1;
}
Body body = 1;
Signature signature = 2;
}

Binary file not shown.

Binary file not shown.