[#18] Add handlers for AddPeer and UpdatePeer events

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2020-09-03 17:55:11 +03:00
parent 7bce9a3d87
commit 92f448f303
3 changed files with 134 additions and 1 deletions

View file

@ -1,6 +1,8 @@
package netmap
import (
"encoding/hex"
timerEvent "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
@ -38,3 +40,40 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleAddPeer(ev event.Event) {
newPeer := ev.(netmapEvent.AddPeer) // todo: check panic in production
np.log.Info("notification",
zap.String("type", "add peer"),
)
// send event to the worker pool
err := np.pool.Submit(func() {
np.processAddPeer(newPeer.Node())
})
if err != nil {
// todo: move into controlled degradation stage
np.log.Warn("netmap worker pool drained",
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleUpdateState(ev event.Event) {
updPeer := ev.(netmapEvent.UpdatePeer) // todo: check panic in production
np.log.Info("notification",
zap.String("type", "update peer state"),
zap.String("key", hex.EncodeToString(updPeer.PublicKey().Bytes())))
// send event to the worker pool
err := np.pool.Submit(func() {
np.processUpdatePeer(updPeer)
})
if err != nil {
// todo: move into controlled degradation stage
np.log.Warn("netmap worker pool drained",
zap.Int("capacity", np.pool.Cap()))
}
}

View file

@ -0,0 +1,64 @@
package netmap
import (
"encoding/hex"
"github.com/nspcc-dev/neofs-api-go/v2/netmap/grpc"
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"go.uber.org/zap"
)
// Process add peer notification by sanity check of new node
// local epoch timer.
func (np *Processor) processAddPeer(node []byte) {
if !np.activeState.IsActive() {
np.log.Info("passive mode, ignore new peer notification")
return
}
// unmarshal grpc (any transport) version of node info from API v2
nodeInfo := new(netmap.NodeInfo)
err := nodeInfo.Unmarshal(node)
if err != nil {
// it will be nice to have tx id at event structure to log it
np.log.Warn("can't parse network map candidate")
return
}
np.log.Info("approving network map candidate",
zap.String("key", hex.EncodeToString(nodeInfo.PublicKey)),
)
err = invoke.ApprovePeer(np.morphClient, np.netmapContract, node)
if err != nil {
np.log.Error("can't invoke netmap.AddPeer", zap.Error(err))
}
}
// Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
if !np.activeState.IsActive() {
np.log.Info("passive mode, ignore new epoch tick")
return
}
// better use unified enum from neofs-api-go/v2/netmap package
if ev.Status() != uint32(netmap.NodeInfo_OFFLINE) {
np.log.Warn("node proposes unknown state",
zap.String("key", hex.EncodeToString(ev.PublicKey().Bytes())),
zap.Uint32("status", ev.Status()),
)
return
}
err := invoke.UpdatePeerState(np.morphClient, np.netmapContract,
&invoke.UpdatePeerArgs{
Key: ev.PublicKey(),
Status: ev.Status(),
})
if err != nil {
np.log.Error("can't invoke netmap.UpdatePeer", zap.Error(err))
}
}

View file

@ -53,7 +53,9 @@ type (
)
const (
newEpochNotification = "NewEpoch"
newEpochNotification = "NewEpoch"
addPeerNotification = "AddPeer"
updatePeerStateNotification = "UpdateState"
)
// New creates network map contract processor instance.
@ -100,6 +102,20 @@ func (np *Processor) ListenerParsers() []event.ParserInfo {
newEpoch.SetParser(netmapEvent.ParseNewEpoch)
parsers = append(parsers, newEpoch)
// new peer event
addPeer := event.ParserInfo{}
addPeer.SetType(addPeerNotification)
addPeer.SetScriptHash(np.netmapContract)
addPeer.SetParser(netmapEvent.ParseAddPeer)
parsers = append(parsers, addPeer)
// update peer event
updatePeer := event.ParserInfo{}
updatePeer.SetType(updatePeerStateNotification)
updatePeer.SetScriptHash(np.netmapContract)
updatePeer.SetParser(netmapEvent.ParseUpdatePeer)
parsers = append(parsers, updatePeer)
return parsers
}
@ -114,6 +130,20 @@ func (np *Processor) ListenerHandlers() []event.HandlerInfo {
newEpoch.SetHandler(np.handleNewEpoch)
handlers = append(handlers, newEpoch)
// new peer handler
addPeer := event.HandlerInfo{}
addPeer.SetType(addPeerNotification)
addPeer.SetScriptHash(np.netmapContract)
addPeer.SetHandler(np.handleAddPeer)
handlers = append(handlers, addPeer)
// update peer handler
updatePeer := event.HandlerInfo{}
updatePeer.SetType(updatePeerStateNotification)
updatePeer.SetScriptHash(np.netmapContract)
updatePeer.SetHandler(np.handleUpdateState)
handlers = append(handlers, updatePeer)
return handlers
}