Seal writecache async #1284
10 changed files with 113 additions and 36 deletions
|
@ -10,6 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
asyncFlag = "async"
|
||||||
restoreModeFlag = "restore-mode"
|
restoreModeFlag = "restore-mode"
|
||||||
shrinkFlag = "shrink"
|
shrinkFlag = "shrink"
|
||||||
)
|
)
|
||||||
|
@ -31,12 +32,14 @@ func sealWritecache(cmd *cobra.Command, _ []string) {
|
||||||
pk := key.Get(cmd)
|
pk := key.Get(cmd)
|
||||||
|
|
||||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||||
|
async, _ := cmd.Flags().GetBool(asyncFlag)
|
||||||
restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag)
|
restoreMode, _ := cmd.Flags().GetBool(restoreModeFlag)
|
||||||
shrink, _ := cmd.Flags().GetBool(shrinkFlag)
|
shrink, _ := cmd.Flags().GetBool(shrinkFlag)
|
||||||
|
|
||||||
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
req := &control.SealWriteCacheRequest{Body: &control.SealWriteCacheRequest_Body{
|
||||||
Shard_ID: getShardIDList(cmd),
|
Shard_ID: getShardIDList(cmd),
|
||||||
IgnoreErrors: ignoreErrors,
|
IgnoreErrors: ignoreErrors,
|
||||||
|
Async: async,
|
||||||
RestoreMode: restoreMode,
|
RestoreMode: restoreMode,
|
||||||
Shrink: shrink,
|
Shrink: shrink,
|
||||||
}}
|
}}
|
||||||
|
@ -77,6 +80,7 @@ func initControlShardsWritecacheCmd() {
|
||||||
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
ff.StringSlice(shardIDFlag, nil, "List of shard IDs in base58 encoding")
|
||||||
ff.Bool(shardAllFlag, false, "Process all shards")
|
ff.Bool(shardAllFlag, false, "Process all shards")
|
||||||
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
ff.Bool(ignoreErrorsFlag, true, "Skip invalid/unreadable objects")
|
||||||
|
ff.Bool(asyncFlag, false, "Run operation in background")
|
||||||
ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing")
|
ff.Bool(restoreModeFlag, false, "Restore writecache's mode after sealing")
|
||||||
ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage")
|
ff.Bool(shrinkFlag, false, "Shrink writecache's internal storage")
|
||||||
|
|
||||||
|
|
|
@ -539,5 +539,8 @@ const (
|
||||||
PolicerCouldNotGetChunk = "could not get EC chunk"
|
PolicerCouldNotGetChunk = "could not get EC chunk"
|
||||||
PolicerCouldNotGetChunks = "could not get EC chunks"
|
PolicerCouldNotGetChunks = "could not get EC chunks"
|
||||||
AuditEventLogRecord = "audit event log record"
|
AuditEventLogRecord = "audit event log record"
|
||||||
|
StartedWritecacheSealAsync = "started writecache seal async"
|
||||||
|
WritecacheSealCompletedAsync = "writecache seal completed successfully"
|
||||||
|
FailedToSealWritecacheAsync = "failed to seal writecache async"
|
||||||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
||||||
)
|
)
|
||||||
|
|
|
@ -70,6 +70,7 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
||||||
type SealWriteCachePrm struct {
|
type SealWriteCachePrm struct {
|
||||||
ShardIDs []*shard.ID
|
ShardIDs []*shard.ID
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
|
Async bool
|
||||||
RestoreMode bool
|
RestoreMode bool
|
||||||
Shrink bool
|
Shrink bool
|
||||||
}
|
}
|
||||||
|
@ -117,7 +118,7 @@ func (e *StorageEngine) SealWriteCache(ctx context.Context, prm SealWriteCachePr
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink})
|
err := sh.SealWriteCache(egCtx, shard.SealWriteCachePrm{IgnoreErrors: prm.IgnoreErrors, Async: prm.Async, RestoreMode: prm.RestoreMode, Shrink: prm.Shrink})
|
||||||
|
|
||||||
resGuard.Lock()
|
resGuard.Lock()
|
||||||
defer resGuard.Unlock()
|
defer resGuard.Unlock()
|
||||||
|
|
|
@ -99,12 +99,50 @@ func (x *metabaseSynchronizer) Init() error {
|
||||||
|
|
||||||
// Init initializes all Shard's components.
|
// Init initializes all Shard's components.
|
||||||
func (s *Shard) Init(ctx context.Context) error {
|
func (s *Shard) Init(ctx context.Context) error {
|
||||||
|
m := s.GetMode()
|
||||||
|
if err := s.initializeComponents(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.updateMetrics(ctx)
|
||||||
|
|
||||||
|
s.gc = &gc{
|
||||||
|
gcCfg: &s.gcCfg,
|
||||||
|
remover: s.removeGarbage,
|
||||||
|
stopChannel: make(chan struct{}),
|
||||||
|
eventChan: make(chan Event),
|
||||||
|
mEventHandler: map[eventType]*eventHandlers{
|
||||||
|
eventNewEpoch: {
|
||||||
|
cancelFunc: func() {},
|
||||||
|
handlers: []eventHandler{
|
||||||
|
s.collectExpiredLocks,
|
||||||
|
s.collectExpiredObjects,
|
||||||
|
s.collectExpiredTombstones,
|
||||||
|
s.collectExpiredMetrics,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if s.gc.metrics != nil {
|
||||||
|
s.gc.metrics.SetShardID(s.info.ID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
s.gc.init(ctx)
|
||||||
|
|
||||||
|
s.rb = newRebuilder(s.rebuildLimiter)
|
||||||
|
if !m.NoMetabase() {
|
||||||
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
|
}
|
||||||
|
s.writecacheSealCancel.Store(dummyCancel)
|
||||||
dstepanov-yadro
commented
Functional change. Other diff is just code refactor (method extract). Functional change. Other diff is just code refactor (method extract).
|
|||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) initializeComponents(m mode.Mode) error {
|
||||||
type initializer interface {
|
type initializer interface {
|
||||||
Init() error
|
Init() error
|
||||||
}
|
}
|
||||||
|
|
||||||
var components []initializer
|
var components []initializer
|
||||||
m := s.GetMode()
|
|
||||||
|
|
||||||
if !m.NoMetabase() {
|
if !m.NoMetabase() {
|
||||||
var initMetabase initializer
|
var initMetabase initializer
|
||||||
|
@ -148,36 +186,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
return fmt.Errorf("could not initialize %T: %w", component, err)
|
return fmt.Errorf("could not initialize %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.updateMetrics(ctx)
|
|
||||||
|
|
||||||
s.gc = &gc{
|
|
||||||
gcCfg: &s.gcCfg,
|
|
||||||
remover: s.removeGarbage,
|
|
||||||
stopChannel: make(chan struct{}),
|
|
||||||
eventChan: make(chan Event),
|
|
||||||
mEventHandler: map[eventType]*eventHandlers{
|
|
||||||
eventNewEpoch: {
|
|
||||||
cancelFunc: func() {},
|
|
||||||
handlers: []eventHandler{
|
|
||||||
s.collectExpiredLocks,
|
|
||||||
s.collectExpiredObjects,
|
|
||||||
s.collectExpiredTombstones,
|
|
||||||
s.collectExpiredMetrics,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if s.gc.metrics != nil {
|
|
||||||
s.gc.metrics.SetShardID(s.info.ID.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
s.gc.init(ctx)
|
|
||||||
|
|
||||||
s.rb = newRebuilder(s.rebuildLimiter)
|
|
||||||
if !m.NoMetabase() {
|
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -350,6 +358,8 @@ func (s *Shard) Close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.hasWriteCache() {
|
if s.hasWriteCache() {
|
||||||
|
prev := s.writecacheSealCancel.Swap(notInitializedCancel)
|
||||||
|
prev.cancel() // no need to wait: writecache.Seal and writecache.Close lock the same mutex
|
||||||
fyrchik
commented
Could you explain, how having the same mutex makes it ok not to wait? Could you explain, how having the same mutex makes it ok not to wait?
dstepanov-yadro
commented
`writecache.Close` will wait for the `writecache.Seal` end.
|
|||||||
components = append(components, s.writeCache)
|
components = append(components, s.writeCache)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,6 +438,9 @@ func (s *Shard) lockExclusive() func() {
|
||||||
cancelGC := val.(context.CancelFunc)
|
cancelGC := val.(context.CancelFunc)
|
||||||
cancelGC()
|
cancelGC()
|
||||||
}
|
}
|
||||||
|
if c := s.writecacheSealCancel.Load(); c != nil {
|
||||||
|
c.cancel()
|
||||||
|
}
|
||||||
s.m.Lock()
|
s.m.Lock()
|
||||||
s.setModeRequested.Store(false)
|
s.setModeRequested.Store(false)
|
||||||
return s.m.Unlock
|
return s.m.Unlock
|
||||||
|
|
|
@ -39,6 +39,7 @@ type Shard struct {
|
||||||
|
|
||||||
gcCancel atomic.Value
|
gcCancel atomic.Value
|
||||||
setModeRequested atomic.Bool
|
setModeRequested atomic.Bool
|
||||||
|
writecacheSealCancel atomic.Pointer[writecacheSealCanceler]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option represents Shard's constructor option.
|
// Option represents Shard's constructor option.
|
||||||
|
@ -190,6 +191,7 @@ func New(opts ...Option) *Shard {
|
||||||
}
|
}
|
||||||
|
|
||||||
s.fillInfo()
|
s.fillInfo()
|
||||||
|
s.writecacheSealCancel.Store(notInitializedCancel)
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,12 +4,24 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
dummyCancel = &writecacheSealCanceler{cancel: func() {}}
|
||||||
|
notInitializedCancel = &writecacheSealCanceler{cancel: func() {}}
|
||||||
|
errWriteCacheSealing = errors.New("writecache is already sealing or shard is not initialized")
|
||||||
|
)
|
||||||
|
|
||||||
|
type writecacheSealCanceler struct {
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
// FlushWriteCachePrm represents parameters of a `FlushWriteCache` operation.
|
||||||
type FlushWriteCachePrm struct {
|
type FlushWriteCachePrm struct {
|
||||||
ignoreErrors bool
|
ignoreErrors bool
|
||||||
|
@ -60,6 +72,7 @@ func (s *Shard) FlushWriteCache(ctx context.Context, p FlushWriteCachePrm) error
|
||||||
|
|
||||||
type SealWriteCachePrm struct {
|
type SealWriteCachePrm struct {
|
||||||
IgnoreErrors bool
|
IgnoreErrors bool
|
||||||
|
Async bool
|
||||||
RestoreMode bool
|
RestoreMode bool
|
||||||
Shrink bool
|
Shrink bool
|
||||||
}
|
}
|
||||||
|
@ -78,15 +91,52 @@ func (s *Shard) SealWriteCache(ctx context.Context, p SealWriteCachePrm) error {
|
||||||
return errWriteCacheDisabled
|
return errWriteCacheDisabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.Async {
|
||||||
|
ctx = context.WithoutCancel(ctx)
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
canceler := &writecacheSealCanceler{cancel: cancel}
|
||||||
|
if !s.writecacheSealCancel.CompareAndSwap(dummyCancel, canceler) {
|
||||||
|
return errWriteCacheSealing
|
||||||
aarifullin
commented
What do you think about this non-functional change:
and use it in this line and below? :) Just make it little bit readable What do you think about this non-functional change:
```go
cleanup := func() {
s.m.RUnlock()
s.writecacheSealCancel.Store(dummyCancel)
}
```
and use it in this line and below? :) Just make it little bit readable
dstepanov-yadro
commented
done done
|
|||||||
|
}
|
||||||
s.m.RLock()
|
s.m.RLock()
|
||||||
defer s.m.RUnlock()
|
cleanup := func() {
|
||||||
|
s.m.RUnlock()
|
||||||
fyrchik
commented
To me it makes reading harder: here it is To me it makes reading harder: here it is `p.sync()`, below `p.Async`, I needed to look at `p.sync` definition to see if they correspond to the same thing.
dstepanov-yadro
commented
A controversial statement, but fixed. A controversial statement, but fixed.
|
|||||||
|
s.writecacheSealCancel.Store(dummyCancel)
|
||||||
|
}
|
||||||
|
|
||||||
if s.info.Mode.ReadOnly() {
|
if s.info.Mode.ReadOnly() {
|
||||||
|
cleanup()
|
||||||
return ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
if s.info.Mode.NoMetabase() {
|
if s.info.Mode.NoMetabase() {
|
||||||
|
cleanup()
|
||||||
fyrchik
commented
If If `p.Async == true && s.info.Mode.ReadOnly()` we will have non-dummy canceler in `writecacheSealCancel` and it won't be called on exit.
It this intentional?
dstepanov-yadro
commented
Shame on me! Thx! Fixed. Shame on me! Thx! Fixed.
|
|||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.writeCache.Seal(ctx, writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink})
|
if !p.Async {
|
||||||
|
defer cleanup()
|
||||||
|
}
|
||||||
fyrchik
commented
Is Is `seal` uncancellable?
dstepanov-yadro
commented
fixed fixed
|
|||||||
|
prm := writecache.SealPrm{IgnoreErrors: p.IgnoreErrors, RestoreMode: p.RestoreMode, Shrink: p.Shrink}
|
||||||
|
if p.Async {
|
||||||
|
started := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
close(started)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
s.log.Info(logs.StartedWritecacheSealAsync)
|
||||||
|
if err := s.writeCache.Seal(ctx, prm); err != nil {
|
||||||
|
s.log.Warn(logs.FailedToSealWritecacheAsync, zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.log.Info(logs.WritecacheSealCompletedAsync)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-started:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s.writeCache.Seal(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ func (s *Server) SealWriteCache(ctx context.Context, req *control.SealWriteCache
|
||||||
prm := engine.SealWriteCachePrm{
|
prm := engine.SealWriteCachePrm{
|
||||||
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
ShardIDs: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||||
|
Async: req.GetBody().GetAsync(),
|
||||||
RestoreMode: req.GetBody().GetRestoreMode(),
|
RestoreMode: req.GetBody().GetRestoreMode(),
|
||||||
Shrink: req.GetBody().GetShrink(),
|
Shrink: req.GetBody().GetShrink(),
|
||||||
}
|
}
|
||||||
|
|
BIN
pkg/services/control/service.pb.go
generated
BIN
pkg/services/control/service.pb.go
generated
Binary file not shown.
|
@ -656,6 +656,9 @@ message SealWriteCacheRequest {
|
||||||
// Flag indicating whether object read errors should be ignored.
|
// Flag indicating whether object read errors should be ignored.
|
||||||
bool ignore_errors = 2;
|
bool ignore_errors = 2;
|
||||||
|
|
||||||
|
// Flag indicating whether writecache will be sealed async.
|
||||||
|
bool async = 3;
|
||||||
|
|
||||||
// If true, then writecache will be sealed, but mode will be restored to the current one.
|
// If true, then writecache will be sealed, but mode will be restored to the current one.
|
||||||
bool restore_mode = 4;
|
bool restore_mode = 4;
|
||||||
|
|
||||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue
To allow writecache seal only after
Init