From 92f448f303633b44038b8caf8b82893d5d1ea262 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 3 Sep 2020 17:55:11 +0300 Subject: [PATCH] [#18] Add handlers for AddPeer and UpdatePeer events Signed-off-by: Alex Vanin --- pkg/innerring/processors/netmap/handlers.go | 39 +++++++++++ .../processors/netmap/process_peers.go | 64 +++++++++++++++++++ pkg/innerring/processors/netmap/processor.go | 32 +++++++++- 3 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 pkg/innerring/processors/netmap/process_peers.go diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index b403d8c7..51aba5b9 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -1,6 +1,8 @@ package netmap import ( + "encoding/hex" + timerEvent "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" @@ -38,3 +40,40 @@ func (np *Processor) handleNewEpoch(ev event.Event) { zap.Int("capacity", np.pool.Cap())) } } + +func (np *Processor) handleAddPeer(ev event.Event) { + newPeer := ev.(netmapEvent.AddPeer) // todo: check panic in production + + np.log.Info("notification", + zap.String("type", "add peer"), + ) + + // send event to the worker pool + + err := np.pool.Submit(func() { + np.processAddPeer(newPeer.Node()) + }) + if err != nil { + // todo: move into controlled degradation stage + np.log.Warn("netmap worker pool drained", + zap.Int("capacity", np.pool.Cap())) + } +} + +func (np *Processor) handleUpdateState(ev event.Event) { + updPeer := ev.(netmapEvent.UpdatePeer) // todo: check panic in production + np.log.Info("notification", + zap.String("type", "update peer state"), + zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes()))) + + // send event to the worker pool + + err := np.pool.Submit(func() { + np.processUpdatePeer(updPeer) + }) + if err != nil { + // todo: move into controlled degradation stage + np.log.Warn("netmap worker pool drained", + zap.Int("capacity", np.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/netmap/process_peers.go b/pkg/innerring/processors/netmap/process_peers.go new file mode 100644 index 00000000..6e5822fa --- /dev/null +++ b/pkg/innerring/processors/netmap/process_peers.go @@ -0,0 +1,64 @@ +package netmap + +import ( + "encoding/hex" + + "github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc" + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "go.uber.org/zap" +) + +// Process add peer notification by sanity check of new node +// local epoch timer. +func (np *Processor) processAddPeer(node []byte) { + if !np.activeState.IsActive() { + np.log.Info("passive mode, ignore new peer notification") + return + } + + // unmarshal grpc (any transport) version of node info from API v2 + nodeInfo := new(netmap.NodeInfo) + + err := nodeInfo.Unmarshal(node) + if err != nil { + // it will be nice to have tx id at event structure to log it + np.log.Warn("can't parse network map candidate") + return + } + + np.log.Info("approving network map candidate", + zap.String("key", hex.EncodeToString(nodeInfo.PublicKey)), + ) + + err = invoke.ApprovePeer(np.morphClient, np.netmapContract, node) + if err != nil { + np.log.Error("can't invoke netmap.AddPeer", zap.Error(err)) + } +} + +// Process new epoch tick by invoking new epoch method in network map contract. +func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { + if !np.activeState.IsActive() { + np.log.Info("passive mode, ignore new epoch tick") + return + } + + // better use unified enum from neofs-api-go/v2/netmap package + if ev.Status() != uint32(netmap.NodeInfo_OFFLINE) { + np.log.Warn("node proposes unknown state", + zap.String("key", hex.EncodeToString(ev.PublicKey().Bytes())), + zap.Uint32("status", ev.Status()), + ) + return + } + + err := invoke.UpdatePeerState(np.morphClient, np.netmapContract, + &invoke.UpdatePeerArgs{ + Key: ev.PublicKey(), + Status: ev.Status(), + }) + if err != nil { + np.log.Error("can't invoke netmap.UpdatePeer", zap.Error(err)) + } +} diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index d762c7b0..2827523b 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -53,7 +53,9 @@ type ( ) const ( - newEpochNotification = "NewEpoch" + newEpochNotification = "NewEpoch" + addPeerNotification = "AddPeer" + updatePeerStateNotification = "UpdateState" ) // New creates network map contract processor instance. @@ -100,6 +102,20 @@ func (np *Processor) ListenerParsers() []event.ParserInfo { newEpoch.SetParser(netmapEvent.ParseNewEpoch) parsers = append(parsers, newEpoch) + // new peer event + addPeer := event.ParserInfo{} + addPeer.SetType(addPeerNotification) + addPeer.SetScriptHash(np.netmapContract) + addPeer.SetParser(netmapEvent.ParseAddPeer) + parsers = append(parsers, addPeer) + + // update peer event + updatePeer := event.ParserInfo{} + updatePeer.SetType(updatePeerStateNotification) + updatePeer.SetScriptHash(np.netmapContract) + updatePeer.SetParser(netmapEvent.ParseUpdatePeer) + parsers = append(parsers, updatePeer) + return parsers } @@ -114,6 +130,20 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo { newEpoch.SetHandler(np.handleNewEpoch) handlers = append(handlers, newEpoch) + // new peer handler + addPeer := event.HandlerInfo{} + addPeer.SetType(addPeerNotification) + addPeer.SetScriptHash(np.netmapContract) + addPeer.SetHandler(np.handleAddPeer) + handlers = append(handlers, addPeer) + + // update peer handler + updatePeer := event.HandlerInfo{} + updatePeer.SetType(updatePeerStateNotification) + updatePeer.SetScriptHash(np.netmapContract) + updatePeer.SetHandler(np.handleUpdateState) + handlers = append(handlers, updatePeer) + return handlers }