From 2d7166f8d09f2dd822ccd3b7bea3bf5cb8edac4d Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 26 Sep 2022 09:30:41 +0300 Subject: [PATCH] [#1770] shard: Move NewEpoch event routing on SE level It will allow dynamic shard management. Closing a shard does not allow removing event handlers. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 10 +-- .../engine/engine_test.go | 4 +- pkg/local_object_storage/engine/lock_test.go | 74 ++++++------------- pkg/local_object_storage/engine/shards.go | 12 +++ pkg/local_object_storage/shard/control.go | 1 + pkg/local_object_storage/shard/gc.go | 15 ++-- pkg/local_object_storage/shard/shard.go | 7 -- 7 files changed, 50 insertions(+), 73 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index f9908bcdc..d84cac288 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -379,6 +379,10 @@ func initLocalStorage(c *cfg) { ls := engine.New(engineOpts...) + addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { + ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) + }) + // allocate memory for the service; // service will be created later c.cfgObject.getSvc = new(getsvc.Service) @@ -494,11 +498,6 @@ func initShardOptions(c *cfg) { metaPath := metabaseCfg.Path() metaPerm := metabaseCfg.BoltDB().Perm() - gcEventChannel := make(chan shard.Event) - addNewEpochNotificationHandler(c, func(ev event.Event) { - gcEventChannel <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) - }) - opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(sc.RefillMetabase()), @@ -531,7 +530,6 @@ func initShardOptions(c *cfg) { return pool }), - shard.WithGCEventChannel(gcEventChannel), }) return nil }) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 6a07f810d..bb2115616 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -139,7 +139,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard { return s } -func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard.Option) *StorageEngine { +func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *StorageEngine { engine := New() for i := 0; i < num; i++ { _, err := engine.AddShard(append([]shard.Option{ @@ -155,7 +155,7 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard. ), shard.WithPiloramaOptions( pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))), - }, extraOpts(i)...)...) + }, extraOpts...)...) require.NoError(t, err) } diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index f24433a7f..d0fa53b54 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -38,12 +38,6 @@ func TestLockUserScenario(t *testing.T) { // 4. saves tombstone for LOCK-object and receives error // 5. waits for an epoch after the lock expiration one // 6. tries to inhume the object and expects success - chEvents := make([]chan shard.Event, 2) - - for i := range chEvents { - chEvents[i] = make(chan shard.Event, 1) - } - const lockerExpiresAfter = 13 cnr := cidtest.ID() @@ -51,17 +45,14 @@ func TestLockUserScenario(t *testing.T) { tombForLockID := oidtest.ID() tombObj.SetID(tombForLockID) - e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { - return []shard.Option{ - shard.WithGCEventChannel(chEvents[i]), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - require.NoError(t, err) + e := testEngineFromShardOpts(t, 2, []shard.Option{ + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) - return pool - }), - shard.WithTombstoneSource(tss{lockerExpiresAfter}), - } + return pool + }), + shard.WithTombstoneSource(tss{lockerExpiresAfter}), }) t.Cleanup(func() { @@ -137,9 +128,7 @@ func TestLockUserScenario(t *testing.T) { require.ErrorIs(t, err, meta.ErrLockObjectRemoval) // 5. - for i := range chEvents { - chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1) - } + e.HandleNewEpoch(lockerExpiresAfter + 1) // delay for GC time.Sleep(time.Second) @@ -156,22 +145,14 @@ func TestLockExpiration(t *testing.T) { // 2. lock object for it is stored, and the object is locked // 3. lock expiration epoch is coming // 4. after some delay the object is not locked anymore - chEvents := make([]chan shard.Event, 2) - for i := range chEvents { - chEvents[i] = make(chan shard.Event, 1) - } + e := testEngineFromShardOpts(t, 2, []shard.Option{ + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) - e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { - return []shard.Option{ - shard.WithGCEventChannel(chEvents[i]), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - require.NoError(t, err) - - return pool - }), - } + return pool + }), }) t.Cleanup(func() { @@ -215,9 +196,7 @@ func TestLockExpiration(t *testing.T) { require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 3. - for i := range chEvents { - chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1) - } + e.HandleNewEpoch(lockerExpiresAfter + 1) // delay for GC processing. It can't be estimated, but making it bigger // will slow down test @@ -237,25 +216,16 @@ func TestLockForceRemoval(t *testing.T) { // 3. try to remove lock object and get error // 4. force lock object removal // 5. the object is not locked anymore - chEvents := make([]chan shard.Event, 2) - - for i := range chEvents { - chEvents[i] = make(chan shard.Event, 1) - } - var e *StorageEngine - e = testEngineFromShardOpts(t, 2, func(i int) []shard.Option { - return []shard.Option{ - shard.WithGCEventChannel(chEvents[i]), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - require.NoError(t, err) + e = testEngineFromShardOpts(t, 2, []shard.Option{ + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + require.NoError(t, err) - return pool - }), - shard.WithDeletedLockCallback(e.processDeletedLocks), - } + return pool + }), + shard.WithDeletedLockCallback(e.processDeletedLocks), }) t.Cleanup(func() { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 636132444..f89a68633 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -176,6 +176,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte return errShardNotFound } +// HandleNewEpoch notifies every shard about NewEpoch event. +func (e *StorageEngine) HandleNewEpoch(epoch uint64) { + ev := shard.EventNewEpoch(epoch) + + e.mtx.RLock() + defer e.mtx.RUnlock() + + for _, sh := range e.shards { + sh.NotificationChannel() <- ev + } +} + func (s hashedShard) Hash() uint64 { return hrw.Hash( []byte(s.Shard.ID().String()), diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 37ca3de51..4fdc2e8dc 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -134,6 +134,7 @@ func (s *Shard) Init() error { gcCfg: s.gcCfg, remover: s.removeGarbage, stopChannel: make(chan struct{}), + eventChan: make(chan Event), mEventHandler: map[eventType]*eventHandlers{ eventNewEpoch: { cancelFunc: func() {}, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 6765bfa05..b61ed7ca3 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -70,12 +70,11 @@ type gc struct { remover func() + eventChan chan Event mEventHandler map[eventType]*eventHandlers } type gcCfg struct { - eventChan <-chan Event - removerInterval time.Duration log *logger.Logger @@ -84,11 +83,7 @@ type gcCfg struct { } func defaultGCCfg() *gcCfg { - ch := make(chan Event) - close(ch) - return &gcCfg{ - eventChan: ch, removerInterval: 10 * time.Second, log: zap.L(), workerPoolInit: func(int) util.WorkerPool { @@ -161,6 +156,9 @@ func (gc *gc) tickRemover() { if gc.workerPool != nil { gc.workerPool.Release() } + + close(gc.eventChan) + gc.log.Debug("GC is stopped") return case <-timer.C: @@ -414,3 +412,8 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { return } } + +// NotificationChannel returns channel for shard events. +func (s *Shard) NotificationChannel() chan<- Event { + return s.gc.eventChan +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 1c386a48c..8cdad3963 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -215,13 +215,6 @@ func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option { } } -// WithGCEventChannel returns option to set a GC event channel. -func WithGCEventChannel(eventChan <-chan Event) Option { - return func(c *cfg) { - c.gcCfg.eventChan = eventChan - } -} - // WithGCRemoverSleepInterval returns option to specify sleep // interval between object remover executions. func WithGCRemoverSleepInterval(dur time.Duration) Option {