[#1284] writecache: Allow to seal writecache async

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-07-31 16:30:07 +03:00
parent 68029d756e
commit 93d63e1632
10 changed files with 75 additions and 5 deletions

View file

@ -10,6 +10,7 @@ import (
) )
const ( const (
asyncFlag = "async"
restoreModeFlag = "restore-mode" restoreModeFlag = "restore-mode"
shrinkFlag = "shrink" shrinkFlag = "shrink"
) )
@ -31,12 +32,14 @@ func sealWritecache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
async, _ := cmd.Flags().GetBool(asyncFlag)
restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag) restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag)
shrink, _ := cmd.Flags().GetBool(shrinkFlag) shrink, _ := cmd.Flags().GetBool(shrinkFlag)
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{ req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Async: async,
RestoreMode: restoreMode, RestoreMode: restoreMode,
Shrink: shrink, Shrink: shrink,
}} }}
@ -77,6 +80,7 @@ func initControlShardsWritecacheCmd() {
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
ff.Bool(asyncFlag, false, "Run operation in background")
ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing") ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing")
ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage") ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage")

View file

@ -539,5 +539,8 @@ const (
PolicerCouldNotGetChunk = "could not get EC chunk" PolicerCouldNotGetChunk = "could not get EC chunk"
PolicerCouldNotGetChunks = "could not get EC chunks" PolicerCouldNotGetChunks = "could not get EC chunks"
AuditEventLogRecord = "audit event log record" AuditEventLogRecord = "audit event log record"
StartedWritecacheSealAsync = "started writecache seal async"
WritecacheSealCompletedAsync = "writecache seal completed successfully"
FailedToSealWritecacheAsync = "failed to seal writecache async"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
) )

View file

@ -70,6 +70,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
type SealWriteCachePrm struct { type SealWriteCachePrm struct {
ShardIDs []*shard.ID ShardIDs []*shard.ID
IgnoreErrors bool IgnoreErrors bool
Async bool
RestoreMode bool RestoreMode bool
Shrink bool Shrink bool
} }
@ -117,7 +118,7 @@ func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePr
return nil return nil
} }
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, Async: prm.Async, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink})
resGuard.Lock() resGuard.Lock()
defer resGuard.Unlock() defer resGuard.Unlock()

View file

@ -178,6 +178,7 @@ func (s *Shard) Init(ctx context.Context) error {
if !m.NoMetabase() { if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
} }
s.writecacheSealCancel.Store(dummyCancel)
return nil return nil
} }
@ -350,6 +351,8 @@ func (s *Shard) Close() error {
} }
if s.hasWriteCache() { if s.hasWriteCache() {
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex
components = append(components, s.writeCache) components = append(components, s.writeCache)
} }
@ -428,6 +431,9 @@ func (s *Shard) lockExclusive() func() {
cancelGC := val.(context.CancelFunc) cancelGC := val.(context.CancelFunc)
cancelGC() cancelGC()
} }
if c := s.writecacheSealCancel.Load(); c != nil {
c.cancel()
}
s.m.Lock() s.m.Lock()
s.setModeRequested.Store(false) s.setModeRequested.Store(false)
return s.m.Unlock return s.m.Unlock

View file

@ -39,6 +39,7 @@ type Shard struct {
gcCancel atomic.Value gcCancel atomic.Value
setModeRequested atomic.Bool setModeRequested atomic.Bool
writecacheSealCancel atomic.Pointer[writecacheSealCanceler]
} }
// Option represents Shard's constructor option. // Option represents Shard's constructor option.
@ -190,6 +191,7 @@ func New(opts ...Option) *Shard {
} }
s.fillInfo() s.fillInfo()
s.writecacheSealCancel.Store(notInitializedCancel)
return s return s
} }

View file

@ -4,12 +4,24 @@ import (
"context" "context"
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
var (
dummyCancel = &writecacheSealCanceler{cancel: func() {}}
notInitializedCancel = &writecacheSealCanceler{cancel: func() {}}
errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized")
)
type writecacheSealCanceler struct {
cancel context.CancelFunc
}
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
ignoreErrors bool ignoreErrors bool
@ -60,6 +72,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
type SealWriteCachePrm struct { type SealWriteCachePrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool
RestoreMode bool RestoreMode bool
Shrink bool Shrink bool
} }
@ -78,15 +91,52 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return errWriteCacheDisabled return errWriteCacheDisabled
} }
if p.Async {
ctx = context.WithoutCancel(ctx)
}
ctx, cancel := context.WithCancel(ctx)
canceler := &writecacheSealCanceler{cancel: cancel}
if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) {
return errWriteCacheSealing
}
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() cleanup := func() {
s.m.RUnlock()
s.writecacheSealCancel.Store(dummyCancel)
}
if s.info.Mode.ReadOnly() { if s.info.Mode.ReadOnly() {
cleanup()
return ErrReadOnlyMode return ErrReadOnlyMode
} }
if s.info.Mode.NoMetabase() { if s.info.Mode.NoMetabase() {
cleanup()
return ErrDegradedMode return ErrDegradedMode
} }
return s.writeCache.Seal(ctx, writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}) if !p.Async {
defer cleanup()
}
prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}
if p.Async {
started := make(chan struct{})
go func() {
close(started)
defer cleanup()
s.log.Info(logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
s.log.Info(logs.WritecacheSealCompletedAsync)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-started:
return nil
}
}
return s.writeCache.Seal(ctx, prm)
} }

View file

@ -19,6 +19,7 @@ func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCache
prm := engine.SealWriteCachePrm{ prm := engine.SealWriteCachePrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(), IgnoreErrors: req.GetBody().GetIgnoreErrors(),
Async: req.GetBody().GetAsync(),
RestoreMode: req.GetBody().GetRestoreMode(), RestoreMode: req.GetBody().GetRestoreMode(),
Shrink: req.GetBody().GetShrink(), Shrink: req.GetBody().GetShrink(),
} }

Binary file not shown.

View file

@ -656,6 +656,9 @@ message SealWriteCacheRequest {
// Flag indicating whether object read errors should be ignored. // Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2; bool ignore_errors = 2;
// Flag indicating whether writecache will be sealed async.
bool async = 3;
// If true, then writecache will be sealed, but mode will be restored to the current one. // If true, then writecache will be sealed, but mode will be restored to the current one.
bool restore_mode = 4; bool restore_mode = 4;

Binary file not shown.