diff --git a/cmd/frostfs-cli/modules/control/writecache.go b/cmd/frostfs-cli/modules/control/writecache.go index b725d8471..ffe9009ab 100644 --- a/cmd/frostfs-cli/modules/control/writecache.go +++ b/cmd/frostfs-cli/modules/control/writecache.go @@ -10,6 +10,7 @@ import ( ) const ( + asyncFlag = "async" restoreModeFlag = "restore-mode" shrinkFlag = "shrink" ) @@ -31,12 +32,14 @@ func sealWritecache(cmd *cobra.Command, _ []string) { pk := key.Get(cmd) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) + async, _ := cmd.Flags().GetBool(asyncFlag) restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag) shrink, _ := cmd.Flags().GetBool(shrinkFlag) req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{ Shard_ID: getShardIDList(cmd), IgnoreErrors: ignoreErrors, + Async: async, RestoreMode: restoreMode, Shrink: shrink, }} @@ -77,6 +80,7 @@ func initControlShardsWritecacheCmd() { ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") + ff.Bool(asyncFlag, false, "Run operation in background") ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing") ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage") diff --git a/internal/logs/logs.go b/internal/logs/logs.go index ebb822e1c..78bcd0c0e 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -539,5 +539,8 @@ const ( PolicerCouldNotGetChunk = "could not get EC chunk" PolicerCouldNotGetChunks = "could not get EC chunks" AuditEventLogRecord = "audit event log record" + StartedWritecacheSealAsync = "started writecache seal async" + WritecacheSealCompletedAsync = "writecache seal completed successfully" + FailedToSealWritecacheAsync = "failed to seal writecache async" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" ) diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 2c5e8cc3a..3e8f387ef 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -70,6 +70,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr type SealWriteCachePrm struct { ShardIDs []*shard.ID IgnoreErrors bool + Async bool RestoreMode bool Shrink bool } @@ -117,7 +118,7 @@ func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePr return nil } - err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) + err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, Async: prm.Async, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) resGuard.Lock() defer resGuard.Unlock() diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 90d7afdd4..210744702 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -178,6 +178,7 @@ func (s *Shard) Init(ctx context.Context) error { if !m.NoMetabase() { s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) } + s.writecacheSealCancel.Store(dummyCancel) return nil } @@ -350,6 +351,8 @@ func (s *Shard) Close() error { } if s.hasWriteCache() { + prev := s.writecacheSealCancel.Swap(notInitializedCancel) + prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex components = append(components, s.writeCache) } @@ -428,6 +431,9 @@ func (s *Shard) lockExclusive() func() { cancelGC := val.(context.CancelFunc) cancelGC() } + if c := s.writecacheSealCancel.Load(); c != nil { + c.cancel() + } s.m.Lock() s.setModeRequested.Store(false) return s.m.Unlock diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 94f22feb5..93f5354a7 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -37,8 +37,9 @@ type Shard struct { rb *rebuilder - gcCancel atomic.Value - setModeRequested atomic.Bool + gcCancel atomic.Value + setModeRequested atomic.Bool + writecacheSealCancel atomic.Pointer[writecacheSealCanceler] } // Option represents Shard's constructor option. @@ -190,6 +191,7 @@ func New(opts ...Option) *Shard { } s.fillInfo() + s.writecacheSealCancel.Store(notInitializedCancel) return s } diff --git a/pkg/local_object_storage/shard/writecache.go b/pkg/local_object_storage/shard/writecache.go index c29710930..a6de07f03 100644 --- a/pkg/local_object_storage/shard/writecache.go +++ b/pkg/local_object_storage/shard/writecache.go @@ -4,12 +4,24 @@ import ( "context" "errors" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" ) +var ( + dummyCancel = &writecacheSealCanceler{cancel: func() {}} + notInitializedCancel = &writecacheSealCanceler{cancel: func() {}} + errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized") +) + +type writecacheSealCanceler struct { + cancel context.CancelFunc +} + // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. type FlushWriteCachePrm struct { ignoreErrors bool @@ -60,6 +72,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error type SealWriteCachePrm struct { IgnoreErrors bool + Async bool RestoreMode bool Shrink bool } @@ -78,15 +91,52 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error { return errWriteCacheDisabled } + if p.Async { + ctx = context.WithoutCancel(ctx) + } + ctx, cancel := context.WithCancel(ctx) + canceler := &writecacheSealCanceler{cancel: cancel} + if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) { + return errWriteCacheSealing + } s.m.RLock() - defer s.m.RUnlock() + cleanup := func() { + s.m.RUnlock() + s.writecacheSealCancel.Store(dummyCancel) + } if s.info.Mode.ReadOnly() { + cleanup() return ErrReadOnlyMode } if s.info.Mode.NoMetabase() { + cleanup() return ErrDegradedMode } - return s.writeCache.Seal(ctx, writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}) + if !p.Async { + defer cleanup() + } + prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink} + if p.Async { + started := make(chan struct{}) + go func() { + close(started) + defer cleanup() + + s.log.Info(logs.StartedWritecacheSealAsync) + if err := s.writeCache.Seal(ctx, prm); err != nil { + s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err)) + return + } + s.log.Info(logs.WritecacheSealCompletedAsync) + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-started: + return nil + } + } + return s.writeCache.Seal(ctx, prm) } diff --git a/pkg/services/control/server/seal_writecache.go b/pkg/services/control/server/seal_writecache.go index 697b91918..1737677b7 100644 --- a/pkg/services/control/server/seal_writecache.go +++ b/pkg/services/control/server/seal_writecache.go @@ -19,6 +19,7 @@ func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCache prm := engine.SealWriteCachePrm{ ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), IgnoreErrors: req.GetBody().GetIgnoreErrors(), + Async: req.GetBody().GetAsync(), RestoreMode: req.GetBody().GetRestoreMode(), Shrink: req.GetBody().GetShrink(), } diff --git a/pkg/services/control/service.pb.go b/pkg/services/control/service.pb.go index ac512f1a5..e5a5ce24c 100644 Binary files a/pkg/services/control/service.pb.go and b/pkg/services/control/service.pb.go differ diff --git a/pkg/services/control/service.proto b/pkg/services/control/service.proto index 486f30a93..d6639cb48 100644 --- a/pkg/services/control/service.proto +++ b/pkg/services/control/service.proto @@ -656,6 +656,9 @@ message SealWriteCacheRequest { // Flag indicating whether object read errors should be ignored. bool ignore_errors = 2; + // Flag indicating whether writecache will be sealed async. + bool async = 3; + // If true, then writecache will be sealed, but mode will be restored to the current one. bool restore_mode = 4; diff --git a/pkg/services/control/service_frostfs.pb.go b/pkg/services/control/service_frostfs.pb.go index 417d25c05..822244e77 100644 Binary files a/pkg/services/control/service_frostfs.pb.go and b/pkg/services/control/service_frostfs.pb.go differ