Seal writecache async #1284
1 changed files with 39 additions and 32 deletions
|
@ -99,12 +99,50 @@ func (x *metabaseSynchronizer) Init() error {
|
||||||
|
|
||||||
// Init initializes all Shard's components.
|
// Init initializes all Shard's components.
|
||||||
func (s *Shard) Init(ctx context.Context) error {
|
func (s *Shard) Init(ctx context.Context) error {
|
||||||
|
m := s.GetMode()
|
||||||
|
if err := s.initializeComponents(m); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.updateMetrics(ctx)
|
||||||
|
|
||||||
|
s.gc = &gc{
|
||||||
|
gcCfg: &s.gcCfg,
|
||||||
|
remover: s.removeGarbage,
|
||||||
|
stopChannel: make(chan struct{}),
|
||||||
|
eventChan: make(chan Event),
|
||||||
|
mEventHandler: map[eventType]*eventHandlers{
|
||||||
|
eventNewEpoch: {
|
||||||
|
cancelFunc: func() {},
|
||||||
|
handlers: []eventHandler{
|
||||||
|
s.collectExpiredLocks,
|
||||||
|
s.collectExpiredObjects,
|
||||||
|
s.collectExpiredTombstones,
|
||||||
|
s.collectExpiredMetrics,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if s.gc.metrics != nil {
|
||||||
|
s.gc.metrics.SetShardID(s.info.ID.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
s.gc.init(ctx)
|
||||||
|
|
||||||
|
s.rb = newRebuilder(s.rebuildLimiter)
|
||||||
|
if !m.NoMetabase() {
|
||||||
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
|
}
|
||||||
|
s.writecacheSealCancel.Store(dummyCancel)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) initializeComponents(m mode.Mode) error {
|
||||||
type initializer interface {
|
type initializer interface {
|
||||||
Init() error
|
Init() error
|
||||||
}
|
}
|
||||||
|
|
||||||
var components []initializer
|
var components []initializer
|
||||||
m := s.GetMode()
|
|
||||||
|
|
||||||
if !m.NoMetabase() {
|
if !m.NoMetabase() {
|
||||||
var initMetabase initializer
|
var initMetabase initializer
|
||||||
|
@ -148,37 +186,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
return fmt.Errorf("could not initialize %T: %w", component, err)
|
return fmt.Errorf("could not initialize %T: %w", component, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.updateMetrics(ctx)
|
|
||||||
|
|
||||||
s.gc = &gc{
|
|
||||||
gcCfg: &s.gcCfg,
|
|
||||||
remover: s.removeGarbage,
|
|
||||||
stopChannel: make(chan struct{}),
|
|
||||||
eventChan: make(chan Event),
|
|
||||||
mEventHandler: map[eventType]*eventHandlers{
|
|
||||||
eventNewEpoch: {
|
|
||||||
cancelFunc: func() {},
|
|
||||||
handlers: []eventHandler{
|
|
||||||
s.collectExpiredLocks,
|
|
||||||
s.collectExpiredObjects,
|
|
||||||
s.collectExpiredTombstones,
|
|
||||||
s.collectExpiredMetrics,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if s.gc.metrics != nil {
|
|
||||||
s.gc.metrics.SetShardID(s.info.ID.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
s.gc.init(ctx)
|
|
||||||
|
|
||||||
s.rb = newRebuilder(s.rebuildLimiter)
|
|
||||||
if !m.NoMetabase() {
|
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
|
||||||
}
|
|
||||||
s.writecacheSealCancel.Store(dummyCancel)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue