[#987] ir: Add RemoveNode parser and handler

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-11-26 17:16:03 +03:00 committed by LeL
parent 81dc17718e
commit 214c2bd0cb
4 changed files with 115 additions and 0 deletions

View file

@ -662,6 +662,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
locodeValidator,
),
NotaryDisabled: server.sideNotaryConfig.disabled,
SubnetContract: &server.contracts.subnet,
})
if err != nil {
return nil, err

View file

@ -6,6 +6,7 @@ import (
timerEvent "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet"
"go.uber.org/zap"
)
@ -99,3 +100,22 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
zap.Int("capacity", np.pool.Cap()))
}
}
func (np *Processor) handleRemoveNode(ev event.Event) {
removeNode := ev.(subnet.RemoveNode)
np.log.Info("notification",
zap.String("type", "remove node from subnet"),
zap.String("subnetID", hex.EncodeToString(removeNode.SubnetworkID())),
zap.String("key", hex.EncodeToString(removeNode.Node())),
)
err := np.pool.Submit(func() {
np.processRemoveSubnetNode(removeNode)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn("netmap worker pool drained",
zap.Int("capacity", np.pool.Cap()))
}
}

View file

@ -1,13 +1,16 @@
package netmap
import (
"bytes"
"encoding/hex"
"sort"
"strings"
netmapclient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
subnetEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id"
"go.uber.org/zap"
)
@ -142,3 +145,75 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
np.log.Error("can't invoke netmap.UpdatePeer", zap.Error(err))
}
}
func (np *Processor) processRemoveSubnetNode(ev subnetEvent.RemoveNode) {
if !np.alphabetState.IsAlphabet() {
np.log.Info("non alphabet mode, ignore remove node from subnet notification")
return
}
candidates, err := np.netmapClient.GetCandidates()
if err != nil {
np.log.Warn("could not get network map candidates",
zap.Error(err),
)
return
}
rawSubnet := ev.SubnetworkID()
subnetToRemoveFrom := &subnetid.ID{}
err = subnetToRemoveFrom.Unmarshal(rawSubnet)
if err != nil {
np.log.Warn("could not unmarshal subnet id",
zap.Error(err),
)
return
}
if subnetid.IsZero(*subnetToRemoveFrom) {
np.log.Warn("got zero subnet in remove node notification")
return
}
for _, node := range candidates.Nodes {
if !bytes.Equal(node.NodeInfo.PublicKey(), ev.Node()) {
continue
}
err = node.IterateSubnets(func(subNetID subnetid.ID) error {
if subNetID.Equals(subnetToRemoveFrom) {
return netmap.ErrRemoveSubnet
}
return nil
})
if err != nil {
np.log.Warn("could not iterate over subnetworks of the node", zap.Error(err))
np.log.Info("vote to remove node from netmap", zap.String("key", hex.EncodeToString(ev.Node())))
prm := netmapclient.UpdatePeerPrm{}
prm.SetKey(ev.Node())
prm.SetState(netmap.NodeStateOffline)
prm.SetHash(ev.TxHash())
err = np.netmapClient.UpdatePeerState(prm)
if err != nil {
np.log.Error("could not invoke netmap.UpdateState", zap.Error(err))
return
}
} else {
prm := netmapclient.AddPeerPrm{}
prm.SetNodeInfo(node.NodeInfo)
prm.SetHash(ev.TxHash())
err = np.netmapClient.AddPeer(prm)
if err != nil {
np.log.Error("could not invoke netmap.AddPeer", zap.Error(err))
return
}
}
break
}
}

View file

@ -5,10 +5,12 @@ import (
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/util"
container "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
nmWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
subnetEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
@ -60,6 +62,8 @@ type (
netmapClient *nmWrapper.Wrapper
containerWrp *container.Wrapper
subnetContract util.Uint160
netmapSnapshot cleanupTable
handleNewAudit event.Handler
@ -83,6 +87,7 @@ type (
CleanupEnabled bool
CleanupThreshold uint64 // in epochs
ContainerWrapper *container.Wrapper
SubnetContract *util.Uint160
HandleAudit event.Handler
AuditSettlementsHandler event.Handler
@ -99,6 +104,7 @@ const (
newEpochNotification = "NewEpoch"
addPeerNotification = "AddPeer"
updatePeerStateNotification = "UpdateState"
removeNodeNotification = "RemoveNode"
)
// New creates network map contract processor instance.
@ -124,6 +130,8 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: container contract wrapper is not set")
case p.NodeValidator == nil:
return nil, errors.New("ir/netmap: node validator is not set")
case p.SubnetContract == nil:
return nil, errors.New("ir/netmap: subnet contract script hash is not set")
}
p.Log.Debug("netmap worker pool", zap.Int("size", p.PoolSize))
@ -143,6 +151,7 @@ func New(p *Params) (*Processor, error) {
containerWrp: p.ContainerWrapper,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleNewAudit: p.HandleAudit,
subnetContract: *p.SubnetContract,
handleAuditSettlements: p.AuditSettlementsHandler,
@ -162,6 +171,11 @@ func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInf
var p event.NotificationParserInfo
// remove node from subnetwork event
p.SetScriptHash(np.subnetContract)
p.SetType(removeNodeNotification)
p.SetParser(subnetEvent.ParseRemoveNode)
p.SetScriptHash(np.netmapClient.ContractAddress())
// new epoch event
@ -192,6 +206,11 @@ func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerI
var i event.NotificationHandlerInfo
// remove node from subnetwork event
i.SetScriptHash(np.subnetContract)
i.SetType(removeNodeNotification)
i.SetHandler(np.handleRemoveNode)
i.SetScriptHash(np.netmapClient.ContractAddress())
// new epoch handler