Create tombstone source when reload SUPPORT #392
1 changed files with 15 additions and 12 deletions
|
@ -864,22 +864,13 @@ func initLocalStorage(c *cfg) {
|
||||||
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
||||||
})
|
})
|
||||||
|
|
||||||
// allocate memory for the service;
|
// allocate memory for the service to create tombstone source;
|
||||||
// service will be created later
|
// service will be created later
|
||||||
c.cfgObject.getSvc = new(getsvc.Service)
|
c.cfgObject.getSvc = new(getsvc.Service)
|
||||||
|
|
||||||
var tssPrm tsourse.TombstoneSourcePrm
|
|
||||||
tssPrm.SetGetService(c.cfgObject.getSvc)
|
|
||||||
tombstoneSrc := tsourse.NewSource(tssPrm)
|
|
||||||
|
|
||||||
tombstoneSource := tombstone.NewChecker(
|
|
||||||
tombstone.WithLogger(c.log),
|
|
||||||
tombstone.WithTombstoneSource(tombstoneSrc),
|
|
||||||
)
|
|
||||||
|
|
||||||
var shardsAttached int
|
var shardsAttached int
|
||||||
for _, optsWithMeta := range c.shardOpts() {
|
for _, optsWithMeta := range c.shardOpts() {
|
||||||
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(tombstoneSource))...)
|
id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Error("failed to attach shard to engine", zap.Error(err))
|
c.log.Error("failed to attach shard to engine", zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -1080,7 +1071,7 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
|
|
||||||
var rcfg engine.ReConfiguration
|
var rcfg engine.ReConfiguration
|
||||||
for _, optsWithID := range c.shardOpts() {
|
for _, optsWithID := range c.shardOpts() {
|
||||||
rcfg.AddShard(optsWithID.configID, optsWithID.shOpts)
|
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
||||||
|
@ -1101,6 +1092,18 @@ func (c *cfg) reloadConfig(ctx context.Context) {
|
||||||
c.log.Info("configuration has been reloaded successfully")
|
c.log.Info("configuration has been reloaded successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
||||||
|
var tssPrm tsourse.TombstoneSourcePrm
|
||||||
|
tssPrm.SetGetService(c.cfgObject.getSvc)
|
||||||
|
tombstoneSrc := tsourse.NewSource(tssPrm)
|
||||||
|
|
||||||
|
tombstoneSource := tombstone.NewChecker(
|
||||||
|
tombstone.WithLogger(c.log),
|
||||||
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
||||||
|
)
|
||||||
|
return tombstoneSource
|
||||||
|
}
|
||||||
|
|
||||||
func (c *cfg) shutdown() {
|
func (c *cfg) shutdown() {
|
||||||
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue