Seal writecache async #1284

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:feat/writecache_seal_async into master 2024-09-04 19:51:10 +00:00
10 changed files with 113 additions and 36 deletions

View file

@ -10,6 +10,7 @@ import (
) )
const ( const (
asyncFlag = "async"
restoreModeFlag = "restore-mode" restoreModeFlag = "restore-mode"
shrinkFlag = "shrink" shrinkFlag = "shrink"
) )
@ -31,12 +32,14 @@ func sealWritecache(cmd *cobra.Command, _ []string) {
pk := key.Get(cmd) pk := key.Get(cmd)
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag) ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
async, _ := cmd.Flags().GetBool(asyncFlag)
restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag) restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag)
shrink, _ := cmd.Flags().GetBool(shrinkFlag) shrink, _ := cmd.Flags().GetBool(shrinkFlag)
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{ req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
Shard_ID: getShardIDList(cmd), Shard_ID: getShardIDList(cmd),
IgnoreErrors: ignoreErrors, IgnoreErrors: ignoreErrors,
Async: async,
RestoreMode: restoreMode, RestoreMode: restoreMode,
Shrink: shrink, Shrink: shrink,
}} }}
@ -77,6 +80,7 @@ func initControlShardsWritecacheCmd() {
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding") ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
ff.Bool(shardAllFlag, false, "Process all shards") ff.Bool(shardAllFlag, false, "Process all shards")
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects") ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
ff.Bool(asyncFlag, false, "Run operation in background")
ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing") ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing")
ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage") ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage")

View file

@ -539,5 +539,8 @@ const (
PolicerCouldNotGetChunk = "could not get EC chunk" PolicerCouldNotGetChunk = "could not get EC chunk"
PolicerCouldNotGetChunks = "could not get EC chunks" PolicerCouldNotGetChunks = "could not get EC chunks"
AuditEventLogRecord = "audit event log record" AuditEventLogRecord = "audit event log record"
StartedWritecacheSealAsync = "started writecache seal async"
WritecacheSealCompletedAsync = "writecache seal completed successfully"
FailedToSealWritecacheAsync = "failed to seal writecache async"
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty" WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
) )

View file

@ -70,6 +70,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
type SealWriteCachePrm struct { type SealWriteCachePrm struct {
ShardIDs []*shard.ID ShardIDs []*shard.ID
IgnoreErrors bool IgnoreErrors bool
Async bool
RestoreMode bool RestoreMode bool
Shrink bool Shrink bool
} }
@ -117,7 +118,7 @@ func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePr
return nil return nil
} }
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink}) err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, Async: prm.Async, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink})
resGuard.Lock() resGuard.Lock()
defer resGuard.Unlock() defer resGuard.Unlock()

View file

@ -99,12 +99,50 @@ func (x *metabaseSynchronizer) Init() error {
// Init initializes all Shard's components. // Init initializes all Shard's components.
func (s *Shard) Init(ctx context.Context) error { func (s *Shard) Init(ctx context.Context) error {
m := s.GetMode()
if err := s.initializeComponents(m); err != nil {
return err
}
s.updateMetrics(ctx)
s.gc = &gc{
gcCfg: &s.gcCfg,
remover: s.removeGarbage,
stopChannel: make(chan struct{}),
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
},
},
}
if s.gc.metrics != nil {
s.gc.metrics.SetShardID(s.info.ID.String())
}
s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}
s.writecacheSealCancel.Store(dummyCancel)

To allow writecache seal only after Init

To allow writecache seal only after `Init`

Functional change. Other diff is just code refactor (method extract).

Functional change. Other diff is just code refactor (method extract).
return nil
}
func (s *Shard) initializeComponents(m mode.Mode) error {
type initializer interface { type initializer interface {
Init() error Init() error
} }
var components []initializer var components []initializer
m := s.GetMode()
if !m.NoMetabase() { if !m.NoMetabase() {
var initMetabase initializer var initMetabase initializer
@ -148,36 +186,6 @@ func (s *Shard) Init(ctx context.Context) error {
return fmt.Errorf("could not initialize %T: %w", component, err) return fmt.Errorf("could not initialize %T: %w", component, err)
} }
} }
s.updateMetrics(ctx)
s.gc = &gc{
gcCfg: &s.gcCfg,
remover: s.removeGarbage,
stopChannel: make(chan struct{}),
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredLocks,
s.collectExpiredObjects,
s.collectExpiredTombstones,
s.collectExpiredMetrics,
},
},
},
}
if s.gc.metrics != nil {
s.gc.metrics.SetShardID(s.info.ID.String())
}
s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}
return nil return nil
} }
@ -350,6 +358,8 @@ func (s *Shard) Close() error {
} }
if s.hasWriteCache() { if s.hasWriteCache() {
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex

Could you explain, how having the same mutex makes it ok not to wait?

Could you explain, how having the same mutex makes it ok not to wait?

writecache.Close will wait for the writecache.Seal end.

`writecache.Close` will wait for the `writecache.Seal` end.
components = append(components, s.writeCache) components = append(components, s.writeCache)
} }
@ -428,6 +438,9 @@ func (s *Shard) lockExclusive() func() {
cancelGC := val.(context.CancelFunc) cancelGC := val.(context.CancelFunc)
cancelGC() cancelGC()
} }
if c := s.writecacheSealCancel.Load(); c != nil {
c.cancel()
}
s.m.Lock() s.m.Lock()
s.setModeRequested.Store(false) s.setModeRequested.Store(false)
return s.m.Unlock return s.m.Unlock

View file

@ -39,6 +39,7 @@ type Shard struct {
gcCancel atomic.Value gcCancel atomic.Value
setModeRequested atomic.Bool setModeRequested atomic.Bool
writecacheSealCancel atomic.Pointer[writecacheSealCanceler]
} }
// Option represents Shard's constructor option. // Option represents Shard's constructor option.
@ -190,6 +191,7 @@ func New(opts ...Option) *Shard {
} }
s.fillInfo() s.fillInfo()
s.writecacheSealCancel.Store(notInitializedCancel)
return s return s
} }

View file

@ -4,12 +4,24 @@ import (
"context" "context"
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
) )
var (
dummyCancel = &writecacheSealCanceler{cancel: func() {}}
notInitializedCancel = &writecacheSealCanceler{cancel: func() {}}
errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized")
)
type writecacheSealCanceler struct {
cancel context.CancelFunc
}
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation. // FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
type FlushWriteCachePrm struct { type FlushWriteCachePrm struct {
ignoreErrors bool ignoreErrors bool
@ -60,6 +72,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
type SealWriteCachePrm struct { type SealWriteCachePrm struct {
IgnoreErrors bool IgnoreErrors bool
Async bool
RestoreMode bool RestoreMode bool
Shrink bool Shrink bool
} }
@ -78,15 +91,52 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
return errWriteCacheDisabled return errWriteCacheDisabled
} }
if p.Async {
ctx = context.WithoutCancel(ctx)
}
ctx, cancel := context.WithCancel(ctx)
canceler := &writecacheSealCanceler{cancel: cancel}
if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) {
return errWriteCacheSealing

What do you think about this non-functional change:

cleanup := func() {
   s.m.RUnlock()
   s.writecacheSealCancel.Store(dummyCancel)
}

and use it in this line and below? :) Just make it little bit readable

What do you think about this non-functional change: ```go cleanup := func() { s.m.RUnlock() s.writecacheSealCancel.Store(dummyCancel) } ``` and use it in this line and below? :) Just make it little bit readable

done

done
}
s.m.RLock() s.m.RLock()
defer s.m.RUnlock() cleanup := func() {
s.m.RUnlock()

To me it makes reading harder: here it is p.sync(), below p.Async, I needed to look at p.sync definition to see if they correspond to the same thing.

To me it makes reading harder: here it is `p.sync()`, below `p.Async`, I needed to look at `p.sync` definition to see if they correspond to the same thing.

A controversial statement, but fixed.

A controversial statement, but fixed.
s.writecacheSealCancel.Store(dummyCancel)
}
if s.info.Mode.ReadOnly() { if s.info.Mode.ReadOnly() {
cleanup()
return ErrReadOnlyMode return ErrReadOnlyMode
} }
if s.info.Mode.NoMetabase() { if s.info.Mode.NoMetabase() {
cleanup()

If p.Async == true && s.info.Mode.ReadOnly() we will have non-dummy canceler in writecacheSealCancel and it won't be called on exit.
It this intentional?

If `p.Async == true && s.info.Mode.ReadOnly()` we will have non-dummy canceler in `writecacheSealCancel` and it won't be called on exit. It this intentional?

Shame on me! Thx! Fixed.

Shame on me! Thx! Fixed.
return ErrDegradedMode return ErrDegradedMode
} }
return s.writeCache.Seal(ctx, writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}) if !p.Async {
defer cleanup()
}

Is seal uncancellable?

Is `seal` uncancellable?

fixed

fixed
prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}
if p.Async {
started := make(chan struct{})
go func() {
close(started)
defer cleanup()
s.log.Info(logs.StartedWritecacheSealAsync)
if err := s.writeCache.Seal(ctx, prm); err != nil {
s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err))
return
}
s.log.Info(logs.WritecacheSealCompletedAsync)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-started:
return nil
}
}
return s.writeCache.Seal(ctx, prm)
} }

View file

@ -19,6 +19,7 @@ func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCache
prm := engine.SealWriteCachePrm{ prm := engine.SealWriteCachePrm{
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()), ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
IgnoreErrors: req.GetBody().GetIgnoreErrors(), IgnoreErrors: req.GetBody().GetIgnoreErrors(),
Async: req.GetBody().GetAsync(),
RestoreMode: req.GetBody().GetRestoreMode(), RestoreMode: req.GetBody().GetRestoreMode(),
Shrink: req.GetBody().GetShrink(), Shrink: req.GetBody().GetShrink(),
} }

Binary file not shown.

View file

@ -656,6 +656,9 @@ message SealWriteCacheRequest {
// Flag indicating whether object read errors should be ignored. // Flag indicating whether object read errors should be ignored.
bool ignore_errors = 2; bool ignore_errors = 2;
// Flag indicating whether writecache will be sealed async.
bool async = 3;
// If true, then writecache will be sealed, but mode will be restored to the current one. // If true, then writecache will be sealed, but mode will be restored to the current one.
bool restore_mode = 4; bool restore_mode = 4;

Binary file not shown.