From bd27837364a581e183eaff6c802739d2888b14e3 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 21 Apr 2022 16:37:42 +0300 Subject: [PATCH] [#1321] node: Register GC event channel before shard init Morph "NewEpoch" event handling was registered in a closure over `addNewEpochNotificationHandler` func. That may lead to the data race: if a shard was initialized before the event registration, everything works as planned, but if registration was made earlier, it was not able to include GC handlers since a shard has not called `eventChanInit` yet and, therefore, it has not registered handler yet. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 15 ++++++--------- pkg/local_object_storage/engine/lock_test.go | 8 ++------ pkg/local_object_storage/shard/gc.go | 10 +++------- pkg/local_object_storage/shard/shard.go | 7 +++---- 4 files changed, 14 insertions(+), 26 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 506810b8..f9fb214b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -401,6 +401,11 @@ func initShardOptions(c *cfg) { metaPerm := metabaseCfg.Perm() fatalOnErr(util.MkdirAllX(filepath.Dir(metaPath), metaPerm)) + gcEventChannel := make(chan shard.Event) + addNewEpochNotificationHandler(c, func(ev event.Event) { + gcEventChannel <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) + }) + opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(sc.RefillMetabase()), @@ -435,15 +440,7 @@ func initShardOptions(c *cfg) { return pool }), - shard.WithGCEventChannelInitializer(func() <-chan shard.Event { - ch := make(chan shard.Event) - - addNewEpochNotificationHandler(c, func(ev event.Event) { - ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) - }) - - return ch - }), + shard.WithGCEventChannel(gcEventChannel), }) }) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index d1261d3a..03e4582c 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -38,9 +38,7 @@ func TestLockUserScenario(t *testing.T) { e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { return []shard.Option{ - shard.WithGCEventChannelInitializer(func() <-chan shard.Event { - return chEvents[i] - }), + shard.WithGCEventChannel(chEvents[i]), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) require.NoError(t, err) @@ -136,9 +134,7 @@ func TestLockExpiration(t *testing.T) { e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { return []shard.Option{ - shard.WithGCEventChannelInitializer(func() <-chan shard.Event { - return chEvents[i] - }), + shard.WithGCEventChannel(chEvents[i]), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) require.NoError(t, err) diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 57b84b8a..b8ece8ac 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -64,7 +64,7 @@ type gc struct { } type gcCfg struct { - eventChanInit func() <-chan Event + eventChan <-chan Event removerInterval time.Duration @@ -78,9 +78,7 @@ func defaultGCCfg() *gcCfg { close(ch) return &gcCfg{ - eventChanInit: func() <-chan Event { - return ch - }, + eventChan: ch, removerInterval: 10 * time.Second, log: zap.L(), workerPoolInit: func(int) util.WorkerPool { @@ -105,10 +103,8 @@ func (gc *gc) init() { } func (gc *gc) listenEvents() { - eventChan := gc.eventChanInit() - for { - event, ok := <-eventChan + event, ok := <-gc.eventChan if !ok { gc.log.Warn("stop event listener by closed channel") return diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index dfe26d38..63c8d552 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -167,11 +167,10 @@ func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option { } } -// WithGCEventChannelInitializer returns option to set set initializer of -// GC event channel. -func WithGCEventChannelInitializer(chInit func() <-chan Event) Option { +// WithGCEventChannel returns option to set a GC event channel. +func WithGCEventChannel(eventChan <-chan Event) Option { return func(c *cfg) { - c.gcCfg.eventChanInit = chInit + c.gcCfg.eventChan = eventChan } }