Create tombstone source when reload #393
2 changed files with 15 additions and 12 deletions
|
@ -847,18 +847,9 @@ func initLocalStorage(c *cfg) {
|
||||||
// service will be created later
|
// service will be created later
|
||||||
c.cfgObject.getSvc = new(getsvc.Service)
|
c.cfgObject.getSvc = new(getsvc.Service)
|
||||||
|
|
||||||
var tssPrm tsourse.TombstoneSourcePrm
|
|
||||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
|
||||||
tombstoneSrc := tsourse.NewSource(tssPrm)
|
|
||||||
|
|
||||||
tombstoneSource := tombstone.NewChecker(
|
|
||||||
tombstone.WithLogger(c.log),
|
|
||||||
tombstone.WithTombstoneSource(tombstoneSrc),
|
|
||||||
)
|
|
||||||
|
|
||||||
var shardsAttached int
|
var shardsAttached int
|
||||||
for _, optsWithMeta := range c.shardOpts() {
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
|
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -1059,7 +1050,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
var rcfg engine.ReConfiguration
|
var rcfg engine.ReConfiguration
|
||||||
for _, optsWithID := range c.shardOpts() {
|
for _, optsWithID := range c.shardOpts() {
|
||||||
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
|
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
||||||
|
@ -1080,6 +1071,18 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||||
|
var tssPrm tsourse.TombstoneSourcePrm
|
||||||
|
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||||
|
tombstoneSrc := tsourse.NewSource(tssPrm)
|
||||||
|
|
||||||
|
tombstoneSource := tombstone.NewChecker(
|
||||||
|
tombstone.WithLogger(c.log),
|
||||||
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
||||||
|
)
|
||||||
|
return tombstoneSource
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cfg) shutdown() {
|
func (c *cfg) shutdown() {
|
||||||
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
||||||
|
|
||||||
|
|
|
@ -146,8 +146,8 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
h := v.handlers[i]
|
h := v.handlers[i]
|
||||||
|
|
||||||
err := gc.workerPool.Submit(func() {
|
err := gc.workerPool.Submit(func() {
|
||||||
|
defer v.prevGroup.Done()
|
||||||
h(runCtx, event)
|
h(runCtx, event)
|
||||||
v.prevGroup.Done()
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
||||||
|
|
Loading…
Reference in a new issue