Allow to seal writecache after flush #886
13 changed files with 309 additions and 5 deletions
|
@ -15,6 +15,7 @@ var flushCacheCmd = &cobra.Command{
|
|||
Short: "Flush objects from the write-cache to the main storage",
|
||||
Long: "Flush objects from the write-cache to the main storage",
|
||||
Run: flushCache,
|
||||
Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.",
|
||||
}
|
||||
|
||||
func flushCache(cmd *cobra.Command, _ []string) {
|
||||
|
|
|
@ -17,6 +17,7 @@ func initControlShardsCmd() {
|
|||
shardsCmd.AddCommand(evacuationShardCmd)
|
||||
shardsCmd.AddCommand(flushCacheCmd)
|
||||
shardsCmd.AddCommand(doctorCmd)
|
||||
shardsCmd.AddCommand(writecacheShardCmd)
|
||||
|
||||
initControlShardsListCmd()
|
||||
initControlSetShardModeCmd()
|
||||
|
@ -24,4 +25,5 @@ func initControlShardsCmd() {
|
|||
initControlEvacuationShardCmd()
|
||||
initControlFlushCacheCmd()
|
||||
initControlDoctorCmd()
|
||||
initControlShardsWritecacheCmd()
|
||||
}
|
||||
|
|
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
73
cmd/frostfs-cli/modules/control/writecache.go
Normal file
|
@ -0,0 +1,73 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key"
|
||||
commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var writecacheShardCmd = &cobra.Command{
|
||||
Use: "writecache",
|
||||
Short: "Operations with storage node's write-cache",
|
||||
Long: "Operations with storage node's write-cache",
|
||||
}
|
||||
|
||||
var sealWritecacheShardCmd = &cobra.Command{
|
||||
Use: "seal",
|
||||
Short: "Flush objects from write-cache and move write-cache to degraded read only mode.",
|
||||
Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.",
|
||||
Run: sealWritecache,
|
||||
}
|
||||
|
||||
func sealWritecache(cmd *cobra.Command, _ []string) {
|
||||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
|
||||
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
}}
|
||||
|
||||
signRequest(cmd, pk, req)
|
||||
|
||||
cli := getClient(cmd, pk)
|
||||
|
||||
var resp *control.SealWriteCacheResponse
|
||||
var err error
|
||||
err = cli.ExecRaw(func(client *client.Client) error {
|
||||
resp, err = control.SealWriteCache(client, req)
|
||||
return err
|
||||
})
|
||||
commonCmd.ExitOnErr(cmd, "rpc error: %w", err)
|
||||
|
||||
verifyResponse(cmd, resp.GetSignature(), resp.GetBody())
|
||||
|
||||
var success, failed uint
|
||||
for _, res := range resp.GetBody().GetResults() {
|
||||
if res.GetSuccess() {
|
||||
success++
|
||||
cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID()))
|
||||
} else {
|
||||
failed++
|
||||
cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError())
|
||||
}
|
||||
}
|
||||
cmd.Printf("Total: %d success, %d failed\n", success, failed)
|
||||
}
|
||||
|
||||
func initControlShardsWritecacheCmd() {
|
||||
writecacheShardCmd.AddCommand(sealWritecacheShardCmd)
|
||||
|
||||
initControlFlags(sealWritecacheShardCmd)
|
||||
|
||||
ff := sealWritecacheShardCmd.Flags()
|
||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||
|
||||
sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// FlushWriteCachePrm groups the parameters of FlushWriteCache operation.
|
||||
|
@ -44,7 +46,7 @@ type FlushWriteCacheRes struct{}
|
|||
func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard)id", p.shardID.String()),
|
||||
attribute.String("shard_id", p.shardID.String()),
|
||||
attribute.Bool("ignore_errors", p.ignoreErrors),
|
||||
attribute.Bool("seal", p.seal),
|
||||
))
|
||||
|
@ -65,6 +67,79 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
|||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||
}
|
||||
|
||||
type SealWriteCachePrm struct {
|
||||
ShardIDs []*shard.ID
|
||||
IgnoreErrors bool
|
||||
}
|
||||
|
||||
type ShardSealResult struct {
|
||||
ShardID *shard.ID
|
||||
Success bool
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
type SealWriteCacheRes struct {
|
||||
ShardResults []ShardSealResult
|
||||
}
|
||||
|
||||
// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode.
|
||||
func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("shard_id_count", len(prm.ShardIDs)),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
res := SealWriteCacheRes{
|
||||
ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)),
|
||||
}
|
||||
resGuard := &sync.Mutex{}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for _, shardID := range prm.ShardIDs {
|
||||
shardID := shardID
|
||||
eg.Go(func() error {
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[shardID.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: errShardNotFound.Error(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors})
|
||||
|
||||
resGuard.Lock()
|
||||
defer resGuard.Unlock()
|
||||
|
||||
if err != nil {
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
ErrorMsg: err.Error(),
|
||||
})
|
||||
} else {
|
||||
res.ShardResults = append(res.ShardResults, ShardSealResult{
|
||||
ShardID: shardID,
|
||||
Success: true,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return SealWriteCacheRes{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type writeCacheMetrics struct {
|
||||
shardID string
|
||||
metrics metrics.WriteCacheMetrics
|
||||
|
|
|
@ -56,3 +56,33 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
|||
|
||||
return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal)
|
||||
}
|
||||
|
||||
type SealWriteCachePrm struct {
|
||||
IgnoreErrors bool
|
||||
}
|
||||
|
||||
// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode.
|
||||
func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.Bool("ignore_errors", p.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if !s.hasWriteCache() {
|
||||
return errWriteCacheDisabled
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
return s.writeCache.Seal(ctx, p.IgnoreErrors)
|
||||
}
|
||||
|
|
28
pkg/local_object_storage/writecache/seal.go
Normal file
28
pkg/local_object_storage/writecache/seal.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("ignore_errors", ignoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
c.modeMtx.Lock()
|
||||
defer c.modeMtx.Unlock()
|
||||
|
||||
// flush will be done by setMode
|
||||
err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors)
|
||||
if err == nil {
|
||||
c.metrics.SetMode(mode.DegradedReadOnly)
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -36,6 +36,7 @@ type Cache interface {
|
|||
SetLogger(*logger.Logger)
|
||||
DumpInfo() Info
|
||||
Flush(context.Context, bool, bool) error
|
||||
Seal(context.Context, bool) error
|
||||
|
||||
Init() error
|
||||
Open(ctx context.Context, readOnly bool) error
|
||||
|
|
|
@ -24,6 +24,7 @@ const (
|
|||
rpcGetChainLocalOverride = "GetChainLocalOverride"
|
||||
rpcListChainLocalOverrides = "ListChainLocalOverrides"
|
||||
rpcRemoveChainLocalOverride = "RemoveChainLocalOverride"
|
||||
rpcSealWriteCache = "SealWriteCache"
|
||||
)
|
||||
|
||||
// HealthCheck executes ControlService.HealthCheck RPC.
|
||||
|
@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR
|
|||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
||||
// SealWriteCache executes ControlService.SealWriteCache RPC.
|
||||
func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) {
|
||||
wResp := newResponseWrapper[SealWriteCacheResponse]()
|
||||
wReq := &requestWrapper{m: req}
|
||||
|
||||
err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wResp.message, nil
|
||||
}
|
||||
|
|
48
pkg/services/control/server/seal_writecache.go
Normal file
48
pkg/services/control/server/seal_writecache.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package control
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) {
|
||||
err := s.isValidRequest(req)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
}
|
||||
|
||||
prm := engine.SealWriteCachePrm{
|
||||
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
}
|
||||
|
||||
res, err := s.s.SealWriteCache(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}}
|
||||
for _, r := range res.ShardResults {
|
||||
if r.Success {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Success: true,
|
||||
})
|
||||
} else {
|
||||
resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{
|
||||
Shard_ID: *r.ShardID,
|
||||
Error: r.ErrorMsg,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
err = SignMessage(s.key, resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -56,6 +56,9 @@ service ControlService {
|
|||
|
||||
// Remove local access policy engine overrides stored in the node by chaind id.
|
||||
rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse);
|
||||
|
||||
// Flush objects from write-cache and move it to degraded read only mode.
|
||||
rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse);
|
||||
}
|
||||
|
||||
// Health check request.
|
||||
|
@ -525,3 +528,32 @@ message RemoveChainLocalOverrideResponse {
|
|||
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message SealWriteCacheRequest {
|
||||
// Request body structure.
|
||||
message Body {
|
||||
// ID of the shard.
|
||||
repeated bytes shard_ID = 1;
|
||||
|
||||
// Flag indicating whether object read errors should be ignored.
|
||||
bool ignore_errors = 2;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
Signature signature = 2;
|
||||
}
|
||||
|
||||
message SealWriteCacheResponse {
|
||||
message Body {
|
||||
message Status {
|
||||
bytes shard_ID = 1;
|
||||
bool success = 2;
|
||||
string error = 3;
|
||||
}
|
||||
repeated Status results = 1;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
||||
Signature signature = 2;
|
||||
}
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
BIN
pkg/services/control/service_grpc.pb.go
generated
BIN
pkg/services/control/service_grpc.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue