[#1770] shard: Move NewEpoch event routing on SE level

It will allow dynamic shard management. Closing a shard does not allow
removing event handlers.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-09-26 09:30:41 +03:00 committed by fyrchik
parent 9374823950
commit 2d7166f8d0
7 changed files with 50 additions and 73 deletions

View file

@ -379,6 +379,10 @@ func initLocalStorage(c *cfg) {
ls := engine.New(engineOpts...) ls := engine.New(engineOpts...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
})
// allocate memory for the service; // allocate memory for the service;
// service will be created later // service will be created later
c.cfgObject.getSvc = new(getsvc.Service) c.cfgObject.getSvc = new(getsvc.Service)
@ -494,11 +498,6 @@ func initShardOptions(c *cfg) {
metaPath := metabaseCfg.Path() metaPath := metabaseCfg.Path()
metaPerm := metabaseCfg.BoltDB().Perm() metaPerm := metabaseCfg.BoltDB().Perm()
gcEventChannel := make(chan shard.Event)
addNewEpochNotificationHandler(c, func(ev event.Event) {
gcEventChannel <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
})
opts = append(opts, []shard.Option{ opts = append(opts, []shard.Option{
shard.WithLogger(c.log), shard.WithLogger(c.log),
shard.WithRefillMetabase(sc.RefillMetabase()), shard.WithRefillMetabase(sc.RefillMetabase()),
@ -531,7 +530,6 @@ func initShardOptions(c *cfg) {
return pool return pool
}), }),
shard.WithGCEventChannel(gcEventChannel),
}) })
return nil return nil
}) })

View file

@ -139,7 +139,7 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
return s return s
} }
func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard.Option) *StorageEngine { func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *StorageEngine {
engine := New() engine := New()
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
_, err := engine.AddShard(append([]shard.Option{ _, err := engine.AddShard(append([]shard.Option{
@ -155,7 +155,7 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts func(int) []shard.
), ),
shard.WithPiloramaOptions( shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))), pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))),
}, extraOpts(i)...)...) }, extraOpts...)...)
require.NoError(t, err) require.NoError(t, err)
} }

View file

@ -38,12 +38,6 @@ func TestLockUserScenario(t *testing.T) {
// 4. saves tombstone for LOCK-object and receives error // 4. saves tombstone for LOCK-object and receives error
// 5. waits for an epoch after the lock expiration one // 5. waits for an epoch after the lock expiration one
// 6. tries to inhume the object and expects success // 6. tries to inhume the object and expects success
chEvents := make([]chan shard.Event, 2)
for i := range chEvents {
chEvents[i] = make(chan shard.Event, 1)
}
const lockerExpiresAfter = 13 const lockerExpiresAfter = 13
cnr := cidtest.ID() cnr := cidtest.ID()
@ -51,9 +45,7 @@ func TestLockUserScenario(t *testing.T) {
tombForLockID := oidtest.ID() tombForLockID := oidtest.ID()
tombObj.SetID(tombForLockID) tombObj.SetID(tombForLockID)
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option { e := testEngineFromShardOpts(t, 2, []shard.Option{
return []shard.Option{
shard.WithGCEventChannel(chEvents[i]),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz) pool, err := ants.NewPool(sz)
require.NoError(t, err) require.NoError(t, err)
@ -61,7 +53,6 @@ func TestLockUserScenario(t *testing.T) {
return pool return pool
}), }),
shard.WithTombstoneSource(tss{lockerExpiresAfter}), shard.WithTombstoneSource(tss{lockerExpiresAfter}),
}
}) })
t.Cleanup(func() { t.Cleanup(func() {
@ -137,9 +128,7 @@ func TestLockUserScenario(t *testing.T) {
require.ErrorIs(t, err, meta.ErrLockObjectRemoval) require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5. // 5.
for i := range chEvents { e.HandleNewEpoch(lockerExpiresAfter + 1)
chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1)
}
// delay for GC // delay for GC
time.Sleep(time.Second) time.Sleep(time.Second)
@ -156,22 +145,14 @@ func TestLockExpiration(t *testing.T) {
// 2. lock object for it is stored, and the object is locked // 2. lock object for it is stored, and the object is locked
// 3. lock expiration epoch is coming // 3. lock expiration epoch is coming
// 4. after some delay the object is not locked anymore // 4. after some delay the object is not locked anymore
chEvents := make([]chan shard.Event, 2)
for i := range chEvents { e := testEngineFromShardOpts(t, 2, []shard.Option{
chEvents[i] = make(chan shard.Event, 1)
}
e := testEngineFromShardOpts(t, 2, func(i int) []shard.Option {
return []shard.Option{
shard.WithGCEventChannel(chEvents[i]),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz) pool, err := ants.NewPool(sz)
require.NoError(t, err) require.NoError(t, err)
return pool return pool
}), }),
}
}) })
t.Cleanup(func() { t.Cleanup(func() {
@ -215,9 +196,7 @@ func TestLockExpiration(t *testing.T) {
require.ErrorAs(t, err, new(apistatus.ObjectLocked)) require.ErrorAs(t, err, new(apistatus.ObjectLocked))
// 3. // 3.
for i := range chEvents { e.HandleNewEpoch(lockerExpiresAfter + 1)
chEvents[i] <- shard.EventNewEpoch(lockerExpiresAfter + 1)
}
// delay for GC processing. It can't be estimated, but making it bigger // delay for GC processing. It can't be estimated, but making it bigger
// will slow down test // will slow down test
@ -237,17 +216,9 @@ func TestLockForceRemoval(t *testing.T) {
// 3. try to remove lock object and get error // 3. try to remove lock object and get error
// 4. force lock object removal // 4. force lock object removal
// 5. the object is not locked anymore // 5. the object is not locked anymore
chEvents := make([]chan shard.Event, 2)
for i := range chEvents {
chEvents[i] = make(chan shard.Event, 1)
}
var e *StorageEngine var e *StorageEngine
e = testEngineFromShardOpts(t, 2, func(i int) []shard.Option { e = testEngineFromShardOpts(t, 2, []shard.Option{
return []shard.Option{
shard.WithGCEventChannel(chEvents[i]),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz) pool, err := ants.NewPool(sz)
require.NoError(t, err) require.NoError(t, err)
@ -255,7 +226,6 @@ func TestLockForceRemoval(t *testing.T) {
return pool return pool
}), }),
shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithDeletedLockCallback(e.processDeletedLocks),
}
}) })
t.Cleanup(func() { t.Cleanup(func() {

View file

@ -176,6 +176,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
return errShardNotFound return errShardNotFound
} }
// HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
ev := shard.EventNewEpoch(epoch)
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.NotificationChannel() <- ev
}
}
func (s hashedShard) Hash() uint64 { func (s hashedShard) Hash() uint64 {
return hrw.Hash( return hrw.Hash(
[]byte(s.Shard.ID().String()), []byte(s.Shard.ID().String()),

View file

@ -134,6 +134,7 @@ func (s *Shard) Init() error {
gcCfg: s.gcCfg, gcCfg: s.gcCfg,
remover: s.removeGarbage, remover: s.removeGarbage,
stopChannel: make(chan struct{}), stopChannel: make(chan struct{}),
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{ mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: { eventNewEpoch: {
cancelFunc: func() {}, cancelFunc: func() {},

View file

@ -70,12 +70,11 @@ type gc struct {
remover func() remover func()
eventChan chan Event
mEventHandler map[eventType]*eventHandlers mEventHandler map[eventType]*eventHandlers
} }
type gcCfg struct { type gcCfg struct {
eventChan <-chan Event
removerInterval time.Duration removerInterval time.Duration
log *logger.Logger log *logger.Logger
@ -84,11 +83,7 @@ type gcCfg struct {
} }
func defaultGCCfg() *gcCfg { func defaultGCCfg() *gcCfg {
ch := make(chan Event)
close(ch)
return &gcCfg{ return &gcCfg{
eventChan: ch,
removerInterval: 10 * time.Second, removerInterval: 10 * time.Second,
log: zap.L(), log: zap.L(),
workerPoolInit: func(int) util.WorkerPool { workerPoolInit: func(int) util.WorkerPool {
@ -161,6 +156,9 @@ func (gc *gc) tickRemover() {
if gc.workerPool != nil { if gc.workerPool != nil {
gc.workerPool.Release() gc.workerPool.Release()
} }
close(gc.eventChan)
gc.log.Debug("GC is stopped") gc.log.Debug("GC is stopped")
return return
case <-timer.C: case <-timer.C:
@ -414,3 +412,8 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) {
return return
} }
} }
// NotificationChannel returns channel for shard events.
func (s *Shard) NotificationChannel() chan<- Event {
return s.gc.eventChan
}

View file

@ -215,13 +215,6 @@ func WithGCWorkerPoolInitializer(wpInit func(int) util.WorkerPool) Option {
} }
} }
// WithGCEventChannel returns option to set a GC event channel.
func WithGCEventChannel(eventChan <-chan Event) Option {
return func(c *cfg) {
c.gcCfg.eventChan = eventChan
}
}
// WithGCRemoverSleepInterval returns option to specify sleep // WithGCRemoverSleepInterval returns option to specify sleep
// interval between object remover executions. // interval between object remover executions.
func WithGCRemoverSleepInterval(dur time.Duration) Option { func WithGCRemoverSleepInterval(dur time.Duration) Option {