forked from TrueCloudLab/frostfs-node
[#132] Generate and handle netmap cleanup event
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
5c7de27546
commit
94957dd38c
3 changed files with 37 additions and 2 deletions
|
@ -77,3 +77,25 @@ func (np *Processor) handleUpdateState(ev event.Event) {
|
||||||
zap.Int("capacity", np.pool.Cap()))
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (np *Processor) handleCleanupTick(ev event.Event) {
|
||||||
|
if !np.netmapSnapshot.enabled {
|
||||||
|
np.log.Debug("netmap clean up routine is disabled")
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup := ev.(netmapCleanupTick)
|
||||||
|
|
||||||
|
np.log.Info("tick", zap.String("type", "netmap cleaner"))
|
||||||
|
|
||||||
|
// send event to the worker pool
|
||||||
|
err := np.pool.Submit(func() {
|
||||||
|
np.processNetmapCleanupTick(cleanup.epoch)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// there system can be moved into controlled degradation stage
|
||||||
|
np.log.Warn("netmap worker pool drained",
|
||||||
|
zap.Int("capacity", np.pool.Cap()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -10,6 +10,18 @@ import (
|
||||||
func (np *Processor) processNewEpoch(epoch uint64) {
|
func (np *Processor) processNewEpoch(epoch uint64) {
|
||||||
np.epochState.SetEpochCounter(epoch)
|
np.epochState.SetEpochCounter(epoch)
|
||||||
np.epochTimer.ResetEpochTimer()
|
np.epochTimer.ResetEpochTimer()
|
||||||
|
|
||||||
|
// get new netmap snapshot
|
||||||
|
snapshot, err := invoke.NetmapSnapshot(np.morphClient, np.netmapContract)
|
||||||
|
if err != nil {
|
||||||
|
np.log.Warn("can't get netmap snapshot to perform cleanup",
|
||||||
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
np.netmapSnapshot.update(snapshot, epoch)
|
||||||
|
np.handleCleanupTick(netmapCleanupTick{epoch: epoch})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process new epoch tick by invoking new epoch method in network map contract.
|
// Process new epoch tick by invoking new epoch method in network map contract.
|
||||||
|
|
|
@ -29,11 +29,12 @@ func (np *Processor) processAddPeer(node []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
keyString := hex.EncodeToString(nodeInfo.PublicKey)
|
keyString := hex.EncodeToString(nodeInfo.PublicKey)
|
||||||
np.log.Info("approving network map candidate",
|
|
||||||
zap.String("key", keyString))
|
|
||||||
|
|
||||||
exists := np.netmapSnapshot.touch(keyString, np.epochState.EpochCounter())
|
exists := np.netmapSnapshot.touch(keyString, np.epochState.EpochCounter())
|
||||||
if !exists {
|
if !exists {
|
||||||
|
np.log.Info("approving network map candidate",
|
||||||
|
zap.String("key", keyString))
|
||||||
|
|
||||||
err = invoke.ApprovePeer(np.morphClient, np.netmapContract, node)
|
err = invoke.ApprovePeer(np.morphClient, np.netmapContract, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error("can't invoke netmap.AddPeer", zap.Error(err))
|
np.log.Error("can't invoke netmap.AddPeer", zap.Error(err))
|
||||||
|
|
Loading…
Reference in a new issue