diff --git a/cmd/frostfs-cli/modules/control/flush_cache.go b/cmd/frostfs-cli/modules/control/flush_cache.go index 7f632e9fc..541961903 100644 --- a/cmd/frostfs-cli/modules/control/flush_cache.go +++ b/cmd/frostfs-cli/modules/control/flush_cache.go @@ -11,10 +11,11 @@ import ( const sealFlag = "seal" var flushCacheCmd = &cobra.Command{ - Use: "flush-cache", - Short: "Flush objects from the write-cache to the main storage", - Long: "Flush objects from the write-cache to the main storage", - Run: flushCache, + Use: "flush-cache", + Short: "Flush objects from the write-cache to the main storage", + Long: "Flush objects from the write-cache to the main storage", + Run: flushCache, + Deprecated: "Flushing objects from writecache to the main storage is performed by writecache automatically. To flush and seal writecache use `frostfs-cli control shards writecache seal`.", } func flushCache(cmd *cobra.Command, _ []string) { diff --git a/cmd/frostfs-cli/modules/control/shards.go b/cmd/frostfs-cli/modules/control/shards.go index 6208c560b..6d3ef420c 100644 --- a/cmd/frostfs-cli/modules/control/shards.go +++ b/cmd/frostfs-cli/modules/control/shards.go @@ -17,6 +17,7 @@ func initControlShardsCmd() { shardsCmd.AddCommand(evacuationShardCmd) shardsCmd.AddCommand(flushCacheCmd) shardsCmd.AddCommand(doctorCmd) + shardsCmd.AddCommand(writecacheShardCmd) initControlShardsListCmd() initControlSetShardModeCmd() @@ -24,4 +25,5 @@ func initControlShardsCmd() { initControlEvacuationShardCmd() initControlFlushCacheCmd() initControlDoctorCmd() + initControlShardsWritecacheCmd() } diff --git a/cmd/frostfs-cli/modules/control/writecache.go b/cmd/frostfs-cli/modules/control/writecache.go new file mode 100644 index 000000000..abc4ed2e6 --- /dev/null +++ b/cmd/frostfs-cli/modules/control/writecache.go @@ -0,0 +1,73 @@ +package control + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-cli/internal/key" + commonCmd "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/internal/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "github.com/mr-tron/base58" + "github.com/spf13/cobra" +) + +var writecacheShardCmd = &cobra.Command{ + Use: "writecache", + Short: "Operations with storage node's write-cache", + Long: "Operations with storage node's write-cache", +} + +var sealWritecacheShardCmd = &cobra.Command{ + Use: "seal", + Short: "Flush objects from write-cache and move write-cache to degraded read only mode.", + Long: "Flush all the objects from the write-cache to the main storage and move the write-cache to the degraded read only mode: write-cache will be empty and no objects will be put in it.", + Run: sealWritecache, +} + +func sealWritecache(cmd *cobra.Command, _ []string) { + pk := key.Get(cmd) + + ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + + req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{ + Shard_ID: getShardIDList(cmd), + IgnoreErrors: ignoreErrors, + }} + + signRequest(cmd, pk, req) + + cli := getClient(cmd, pk) + + var resp *control.SealWriteCacheResponse + var err error + err = cli.ExecRaw(func(client *client.Client) error { + resp, err = control.SealWriteCache(client, req) + return err + }) + commonCmd.ExitOnErr(cmd, "rpc error: %w", err) + + verifyResponse(cmd, resp.GetSignature(), resp.GetBody()) + + var success, failed uint + for _, res := range resp.GetBody().GetResults() { + if res.GetSuccess() { + success++ + cmd.Printf("Shard %s: OK\n", base58.Encode(res.GetShard_ID())) + } else { + failed++ + cmd.Printf("Shard %s: failed with error %q\n", base58.Encode(res.GetShard_ID()), res.GetError()) + } + } + cmd.Printf("Total: %d success, %d failed\n", success, failed) +} + +func initControlShardsWritecacheCmd() { + writecacheShardCmd.AddCommand(sealWritecacheShardCmd) + + initControlFlags(sealWritecacheShardCmd) + + ff := sealWritecacheShardCmd.Flags() + ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") + ff.Bool(shardAllFlag, false, "Process all shards") + ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") + + sealWritecacheShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag) +} diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index d92a86f5d..0eca018f8 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -2,6 +2,7 @@ package engine import ( "context" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" @@ -11,6 +12,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" ) // FlushWriteCachePrm groups the parameters of FlushWriteCache operation. @@ -44,7 +46,7 @@ type FlushWriteCacheRes struct{} func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) (FlushWriteCacheRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.FlushWriteCache", trace.WithAttributes( - attribute.String("shard)id", p.shardID.String()), + attribute.String("shard_id", p.shardID.String()), attribute.Bool("ignore_errors", p.ignoreErrors), attribute.Bool("seal", p.seal), )) @@ -65,6 +67,79 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) } +type SealWriteCachePrm struct { + ShardIDs []*shard.ID + IgnoreErrors bool +} + +type ShardSealResult struct { + ShardID *shard.ID + Success bool + ErrorMsg string +} + +type SealWriteCacheRes struct { + ShardResults []ShardSealResult +} + +// SealWriteCache flushed all data to blobstore and moves write-cache to degraded read only mode. +func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePrm) (SealWriteCacheRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.SealWriteCache", + trace.WithAttributes( + attribute.Int("shard_id_count", len(prm.ShardIDs)), + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() + + res := SealWriteCacheRes{ + ShardResults: make([]ShardSealResult, 0, len(prm.ShardIDs)), + } + resGuard := &sync.Mutex{} + + eg, egCtx := errgroup.WithContext(ctx) + for _, shardID := range prm.ShardIDs { + shardID := shardID + eg.Go(func() error { + e.mtx.RLock() + sh, ok := e.shards[shardID.String()] + e.mtx.RUnlock() + + if !ok { + resGuard.Lock() + defer resGuard.Unlock() + res.ShardResults = append(res.ShardResults, ShardSealResult{ + ShardID: shardID, + ErrorMsg: errShardNotFound.Error(), + }) + return nil + } + + err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors}) + + resGuard.Lock() + defer resGuard.Unlock() + + if err != nil { + res.ShardResults = append(res.ShardResults, ShardSealResult{ + ShardID: shardID, + ErrorMsg: err.Error(), + }) + } else { + res.ShardResults = append(res.ShardResults, ShardSealResult{ + ShardID: shardID, + Success: true, + }) + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return SealWriteCacheRes{}, err + } + return res, nil +} + type writeCacheMetrics struct { shardID string metrics metrics.WriteCacheMetrics diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index 4e57a0497..05e014d29 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -56,3 +56,33 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error return s.writeCache.Flush(ctx, p.ignoreErrors, p.seal) } + +type SealWriteCachePrm struct { + IgnoreErrors bool +} + +// SealWriteCache flushes all data from the write-cache and moves it to degraded read only mode. +func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.SealWriteCache", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Bool("ignore_errors", p.IgnoreErrors), + )) + defer span.End() + + if !s.hasWriteCache() { + return errWriteCacheDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.writeCache.Seal(ctx, p.IgnoreErrors) +} diff --git a/pkg/local_object_storage/writecache/seal.go b/pkg/local_object_storage/writecache/seal.go new file mode 100644 index 000000000..be4fec367 --- /dev/null +++ b/pkg/local_object_storage/writecache/seal.go @@ -0,0 +1,28 @@ +package writecache + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (c *cache) Seal(ctx context.Context, ignoreErrors bool) error { + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Seal", + trace.WithAttributes( + attribute.Bool("ignore_errors", ignoreErrors), + )) + defer span.End() + + c.modeMtx.Lock() + defer c.modeMtx.Unlock() + + // flush will be done by setMode + err := c.setMode(ctx, mode.DegradedReadOnly, ignoreErrors) + if err == nil { + c.metrics.SetMode(mode.DegradedReadOnly) + } + return err +} diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 0549c27f7..99fc5390a 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -36,6 +36,7 @@ type Cache interface { SetLogger(*logger.Logger) DumpInfo() Info Flush(context.Context, bool, bool) error + Seal(context.Context, bool) error Init() error Open(ctx context.Context, readOnly bool) error diff --git a/pkg/services/control/rpc.go b/pkg/services/control/rpc.go index b9ec05e71..3fd4d4ae9 100644 --- a/pkg/services/control/rpc.go +++ b/pkg/services/control/rpc.go @@ -24,6 +24,7 @@ const ( rpcGetChainLocalOverride = "GetChainLocalOverride" rpcListChainLocalOverrides = "ListChainLocalOverrides" rpcRemoveChainLocalOverride = "RemoveChainLocalOverride" + rpcSealWriteCache = "SealWriteCache" ) // HealthCheck executes ControlService.HealthCheck RPC. @@ -264,3 +265,16 @@ func RemoveChainLocalOverride(cli *client.Client, req *RemoveChainLocalOverrideR return wResp.message, nil } + +// SealWriteCache executes ControlService.SealWriteCache RPC. +func SealWriteCache(cli *client.Client, req *SealWriteCacheRequest, opts ...client.CallOption) (*SealWriteCacheResponse, error) { + wResp := newResponseWrapper[SealWriteCacheResponse]() + wReq := &requestWrapper{m: req} + + err := client.SendUnary(cli, common.CallMethodInfoUnary(serviceName, rpcSealWriteCache), wReq, wResp, opts...) + if err != nil { + return nil, err + } + + return wResp.message, nil +} diff --git a/pkg/services/control/server/seal_writecache.go b/pkg/services/control/server/seal_writecache.go new file mode 100644 index 000000000..5c39cf484 --- /dev/null +++ b/pkg/services/control/server/seal_writecache.go @@ -0,0 +1,48 @@ +package control + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCacheRequest) (*control.SealWriteCacheResponse, error) { + err := s.isValidRequest(req) + if err != nil { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + + prm := engine.SealWriteCachePrm{ + ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), + IgnoreErrors: req.GetBody().GetIgnoreErrors(), + } + + res, err := s.s.SealWriteCache(ctx, prm) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + resp := &control.SealWriteCacheResponse{Body: &control.SealWriteCacheResponse_Body{}} + for _, r := range res.ShardResults { + if r.Success { + resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{ + Shard_ID: *r.ShardID, + Success: true, + }) + } else { + resp.Body.Results = append(resp.GetBody().GetResults(), &control.SealWriteCacheResponse_Body_Status{ + Shard_ID: *r.ShardID, + Error: r.ErrorMsg, + }) + } + } + + err = SignMessage(s.key, resp) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index 90e643e29..33ed898ba 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index e3b507387..34c040f2a 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -56,6 +56,9 @@ service ControlService { // Remove local access policy engine overrides stored in the node by chaind id. rpc RemoveChainLocalOverride (RemoveChainLocalOverrideRequest) returns (RemoveChainLocalOverrideResponse); + + // Flush objects from write-cache and move it to degraded read only mode. + rpc SealWriteCache(SealWriteCacheRequest) returns (SealWriteCacheResponse); } // Health check request. @@ -525,3 +528,32 @@ message RemoveChainLocalOverrideResponse { Signature signature = 2; } + +message SealWriteCacheRequest { + // Request body structure. + message Body { + // ID of the shard. + repeated bytes shard_ID = 1; + + // Flag indicating whether object read errors should be ignored. + bool ignore_errors = 2; + } + + Body body = 1; + Signature signature = 2; +} + +message SealWriteCacheResponse { + message Body { + message Status { + bytes shard_ID = 1; + bool success = 2; + string error = 3; + } + repeated Status results = 1; + } + + Body body = 1; + + Signature signature = 2; +} \ No newline at end of file diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 3ace081e4..4ce6791f8 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ diff --git a/pkg/services/control/service_grpc.pb.go b/pkg/services/control/service_grpc.pb.go index c3976c54a..e09f6750b 100644 Binary files a/pkg/services/control/service_grpc.pb.go and b/pkg/services/control/service_grpc.pb.go differ