From 94957dd38cb08a1695567c310ee12814e6113265 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 29 Oct 2020 19:08:36 +0300 Subject: [PATCH] [#132] Generate and handle netmap cleanup event Signed-off-by: Alex Vanin --- pkg/innerring/processors/netmap/handlers.go | 22 +++++++++++++++++++ .../processors/netmap/process_epoch.go | 12 ++++++++++ .../processors/netmap/process_peers.go | 5 +++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index bd0b1cdee..2e11891d3 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -77,3 +77,25 @@ func (np *Processor) handleUpdateState(ev event.Event) { zap.Int("capacity", np.pool.Cap())) } } + +func (np *Processor) handleCleanupTick(ev event.Event) { + if !np.netmapSnapshot.enabled { + np.log.Debug("netmap clean up routine is disabled") + + return + } + + cleanup := ev.(netmapCleanupTick) + + np.log.Info("tick", zap.String("type", "netmap cleaner")) + + // send event to the worker pool + err := np.pool.Submit(func() { + np.processNetmapCleanupTick(cleanup.epoch) + }) + if err != nil { + // there system can be moved into controlled degradation stage + np.log.Warn("netmap worker pool drained", + zap.Int("capacity", np.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index abafd8000..8736f3122 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -10,6 +10,18 @@ import ( func (np *Processor) processNewEpoch(epoch uint64) { np.epochState.SetEpochCounter(epoch) np.epochTimer.ResetEpochTimer() + + // get new netmap snapshot + snapshot, err := invoke.NetmapSnapshot(np.morphClient, np.netmapContract) + if err != nil { + np.log.Warn("can't get netmap snapshot to perform cleanup", + zap.String("error", err.Error())) + + return + } + + np.netmapSnapshot.update(snapshot, epoch) + np.handleCleanupTick(netmapCleanupTick{epoch: epoch}) } // Process new epoch tick by invoking new epoch method in network map contract. diff --git a/pkg/innerring/processors/netmap/process_peers.go b/pkg/innerring/processors/netmap/process_peers.go index 6420bdd14..217d67e5c 100644 --- a/pkg/innerring/processors/netmap/process_peers.go +++ b/pkg/innerring/processors/netmap/process_peers.go @@ -29,11 +29,12 @@ func (np *Processor) processAddPeer(node []byte) { } keyString := hex.EncodeToString(nodeInfo.PublicKey) - np.log.Info("approving network map candidate", - zap.String("key", keyString)) exists := np.netmapSnapshot.touch(keyString, np.epochState.EpochCounter()) if !exists { + np.log.Info("approving network map candidate", + zap.String("key", keyString)) + err = invoke.ApprovePeer(np.morphClient, np.netmapContract, node) if err != nil { np.log.Error("can't invoke netmap.AddPeer", zap.Error(err))