Create tombstone source when reload #393

Merged
fyrchik merged 2 commits from dstepanov-yadro/frostfs-node:fix/gc_panic into master 2023-05-26 12:14:03 +00:00
2 changed files with 15 additions and 12 deletions

View file

@ -847,18 +847,9 @@ func initLocalStorage(c *cfg) {
// service will be created later // service will be created later
c.cfgObject.getSvc = new(getsvc.Service) c.cfgObject.getSvc = new(getsvc.Service)
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
var shardsAttached int var shardsAttached int
for _, optsWithMeta := range c.shardOpts() { for _, optsWithMeta := range c.shardOpts() {
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...) id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
if err != nil { if err != nil {
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
} else { } else {
@ -1059,7 +1050,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
var rcfg engine.ReConfiguration var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts() { for _, optsWithID := range c.shardOpts() {
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts) rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
} }
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
@ -1080,6 +1071,18 @@ func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
} }
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
return tombstoneSource
}
func (c *cfg) shutdown() { func (c *cfg) shutdown() {
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)

View file

@ -146,8 +146,8 @@ func (gc *gc) listenEvents(ctx context.Context) {
h := v.handlers[i] h := v.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done()
h(runCtx, event) h(runCtx, event)
v.prevGroup.Done()
}) })
if err != nil { if err != nil {
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,