From 214c2bd0cb4bc1a465d5e4c6ab8d3532b78dedb6 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 26 Nov 2021 17:16:03 +0300 Subject: [PATCH] [#987] ir: Add `RemoveNode` parser and handler Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 1 + pkg/innerring/processors/netmap/handlers.go | 20 +++++ .../processors/netmap/process_peers.go | 75 +++++++++++++++++++ pkg/innerring/processors/netmap/processor.go | 19 +++++ 4 files changed, 115 insertions(+) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 37646cc51..07914d57c 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -662,6 +662,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error locodeValidator, ), NotaryDisabled: server.sideNotaryConfig.disabled, + SubnetContract: &server.contracts.subnet, }) if err != nil { return nil, err diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index acd34f764..4d91f882b 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -6,6 +6,7 @@ import ( timerEvent "github.com/nspcc-dev/neofs-node/pkg/innerring/timers" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet" "go.uber.org/zap" ) @@ -99,3 +100,22 @@ func (np *Processor) handleCleanupTick(ev event.Event) { zap.Int("capacity", np.pool.Cap())) } } + +func (np *Processor) handleRemoveNode(ev event.Event) { + removeNode := ev.(subnet.RemoveNode) + + np.log.Info("notification", + zap.String("type", "remove node from subnet"), + zap.String("subnetID", hex.EncodeToString(removeNode.SubnetworkID())), + zap.String("key", hex.EncodeToString(removeNode.Node())), + ) + + err := np.pool.Submit(func() { + np.processRemoveSubnetNode(removeNode) + }) + if err != nil { + // there system can be moved into controlled degradation stage + np.log.Warn("netmap worker pool drained", + zap.Int("capacity", np.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/netmap/process_peers.go b/pkg/innerring/processors/netmap/process_peers.go index 3026e6456..55ba9c80f 100644 --- a/pkg/innerring/processors/netmap/process_peers.go +++ b/pkg/innerring/processors/netmap/process_peers.go @@ -1,13 +1,16 @@ package netmap import ( + "bytes" "encoding/hex" "sort" "strings" netmapclient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + subnetEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet" "github.com/nspcc-dev/neofs-sdk-go/netmap" + subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id" "go.uber.org/zap" ) @@ -142,3 +145,75 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { np.log.Error("can't invoke netmap.UpdatePeer", zap.Error(err)) } } + +func (np *Processor) processRemoveSubnetNode(ev subnetEvent.RemoveNode) { + if !np.alphabetState.IsAlphabet() { + np.log.Info("non alphabet mode, ignore remove node from subnet notification") + return + } + + candidates, err := np.netmapClient.GetCandidates() + if err != nil { + np.log.Warn("could not get network map candidates", + zap.Error(err), + ) + return + } + + rawSubnet := ev.SubnetworkID() + subnetToRemoveFrom := &subnetid.ID{} + + err = subnetToRemoveFrom.Unmarshal(rawSubnet) + if err != nil { + np.log.Warn("could not unmarshal subnet id", + zap.Error(err), + ) + return + } + + if subnetid.IsZero(*subnetToRemoveFrom) { + np.log.Warn("got zero subnet in remove node notification") + return + } + + for _, node := range candidates.Nodes { + if !bytes.Equal(node.NodeInfo.PublicKey(), ev.Node()) { + continue + } + + err = node.IterateSubnets(func(subNetID subnetid.ID) error { + if subNetID.Equals(subnetToRemoveFrom) { + return netmap.ErrRemoveSubnet + } + + return nil + }) + if err != nil { + np.log.Warn("could not iterate over subnetworks of the node", zap.Error(err)) + np.log.Info("vote to remove node from netmap", zap.String("key", hex.EncodeToString(ev.Node()))) + + prm := netmapclient.UpdatePeerPrm{} + prm.SetKey(ev.Node()) + prm.SetState(netmap.NodeStateOffline) + prm.SetHash(ev.TxHash()) + + err = np.netmapClient.UpdatePeerState(prm) + if err != nil { + np.log.Error("could not invoke netmap.UpdateState", zap.Error(err)) + return + } + } else { + prm := netmapclient.AddPeerPrm{} + prm.SetNodeInfo(node.NodeInfo) + prm.SetHash(ev.TxHash()) + + err = np.netmapClient.AddPeer(prm) + if err != nil { + np.log.Error("could not invoke netmap.AddPeer", zap.Error(err)) + return + } + } + + break + } +} diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index 9d06f6371..ba7630bee 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -5,10 +5,12 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" + "github.com/nspcc-dev/neo-go/pkg/util" container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" nmWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/event" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + subnetEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -60,6 +62,8 @@ type ( netmapClient *nmWrapper.Wrapper containerWrp *container.Wrapper + subnetContract util.Uint160 + netmapSnapshot cleanupTable handleNewAudit event.Handler @@ -83,6 +87,7 @@ type ( CleanupEnabled bool CleanupThreshold uint64 // in epochs ContainerWrapper *container.Wrapper + SubnetContract *util.Uint160 HandleAudit event.Handler AuditSettlementsHandler event.Handler @@ -99,6 +104,7 @@ const ( newEpochNotification = "NewEpoch" addPeerNotification = "AddPeer" updatePeerStateNotification = "UpdateState" + removeNodeNotification = "RemoveNode" ) // New creates network map contract processor instance. @@ -124,6 +130,8 @@ func New(p *Params) (*Processor, error) { return nil, errors.New("ir/netmap: container contract wrapper is not set") case p.NodeValidator == nil: return nil, errors.New("ir/netmap: node validator is not set") + case p.SubnetContract == nil: + return nil, errors.New("ir/netmap: subnet contract script hash is not set") } p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize)) @@ -143,6 +151,7 @@ func New(p *Params) (*Processor, error) { containerWrp: p.ContainerWrapper, netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), handleNewAudit: p.HandleAudit, + subnetContract: *p.SubnetContract, handleAuditSettlements: p.AuditSettlementsHandler, @@ -162,6 +171,11 @@ func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInf var p event.NotificationParserInfo + // remove node from subnetwork event + p.SetScriptHash(np.subnetContract) + p.SetType(removeNodeNotification) + p.SetParser(subnetEvent.ParseRemoveNode) + p.SetScriptHash(np.netmapClient.ContractAddress()) // new epoch event @@ -192,6 +206,11 @@ func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI var i event.NotificationHandlerInfo + // remove node from subnetwork event + i.SetScriptHash(np.subnetContract) + i.SetType(removeNodeNotification) + i.SetHandler(np.handleRemoveNode) + i.SetScriptHash(np.netmapClient.ContractAddress()) // new epoch handler