forked from TrueCloudLab/frostfs-node
[#1321] node: Register GC event channel before shard init
Morph "NewEpoch" event handling was registered in a closure over `addNewEpochNotificationHandler` func. That may lead to the data race: if a shard was initialized before the event registration, everything works as planned, but if registration was made earlier, it was not able to include GC handlers since a shard has not called `eventChanInit` yet and, therefore, it has not registered handler yet. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
d4569946c5
commit
bd27837364
4 changed files with 14 additions and 26 deletions
|
@ -401,6 +401,11 @@ func initShardOptions(c *cfg) {
|
||||||
metaPerm := metabaseCfg.Perm()
|
metaPerm := metabaseCfg.Perm()
|
||||||
fatalOnErr(util.MkdirAllX(filepath.Dir(metaPath), metaPerm))
|
fatalOnErr(util.MkdirAllX(filepath.Dir(metaPath), metaPerm))
|
||||||
|
|
||||||
|
gcEventChannel := make(chan shard.Event)
|
||||||
|
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
||||||
|
gcEventChannel <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
||||||
|
})
|
||||||
|
|
||||||
opts = append(opts, []shard.Option{
|
opts = append(opts, []shard.Option{
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(sc.RefillMetabase()),
|
shard.WithRefillMetabase(sc.RefillMetabase()),
|
||||||
|
@ -435,15 +440,7 @@ func initShardOptions(c *cfg) {
|
||||||
|
|
||||||
return pool
|
return pool
|
||||||
}),
|
}),
|
||||||
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
|
shard.WithGCEventChannel(gcEventChannel),
|
||||||
ch := make(chan shard.Event)
|
|
||||||
|
|
||||||
addNewEpochNotificationHandler(c, func(ev event.Event) {
|
|
||||||
ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
|
||||||
})
|
|
||||||
|
|
||||||
return ch
|
|
||||||
}),
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -38,9 +38,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
|
|
||||||
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
|
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
|
shard.WithGCEventChannel(chEvents[i]),
|
||||||
return chEvents[i]
|
|
||||||
}),
|
|
||||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||||
pool, err := ants.NewPool(sz)
|
pool, err := ants.NewPool(sz)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -136,9 +134,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
|
|
||||||
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
|
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
|
shard.WithGCEventChannel(chEvents[i]),
|
||||||
return chEvents[i]
|
|
||||||
}),
|
|
||||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||||
pool, err := ants.NewPool(sz)
|
pool, err := ants.NewPool(sz)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -64,7 +64,7 @@ type gc struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcCfg struct {
|
type gcCfg struct {
|
||||||
eventChanInit func() <-chan Event
|
eventChan <-chan Event
|
||||||
|
|
||||||
removerInterval time.Duration
|
removerInterval time.Duration
|
||||||
|
|
||||||
|
@ -78,9 +78,7 @@ func defaultGCCfg() *gcCfg {
|
||||||
close(ch)
|
close(ch)
|
||||||
|
|
||||||
return &gcCfg{
|
return &gcCfg{
|
||||||
eventChanInit: func() <-chan Event {
|
eventChan: ch,
|
||||||
return ch
|
|
||||||
},
|
|
||||||
removerInterval: 10 * time.Second,
|
removerInterval: 10 * time.Second,
|
||||||
log: zap.L(),
|
log: zap.L(),
|
||||||
workerPoolInit: func(int) util.WorkerPool {
|
workerPoolInit: func(int) util.WorkerPool {
|
||||||
|
@ -105,10 +103,8 @@ func (gc *gc) init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gc *gc) listenEvents() {
|
func (gc *gc) listenEvents() {
|
||||||
eventChan := gc.eventChanInit()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
event, ok := <-eventChan
|
event, ok := <-gc.eventChan
|
||||||
if !ok {
|
if !ok {
|
||||||
gc.log.Warn("stop event listener by closed channel")
|
gc.log.Warn("stop event listener by closed channel")
|
||||||
return
|
return
|
||||||
|
|
|
@ -167,11 +167,10 @@ func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithGCEventChannelInitializer returns option to set set initializer of
|
// WithGCEventChannel returns option to set a GC event channel.
|
||||||
// GC event channel.
|
func WithGCEventChannel(eventChan <-chan Event) Option {
|
||||||
func WithGCEventChannelInitializer(chInit func() <-chan Event) Option {
|
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.gcCfg.eventChanInit = chInit
|
c.gcCfg.eventChan = eventChan
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue