forked from TrueCloudLab/frostfs-node
333 lines
8.9 KiB
Go
333 lines
8.9 KiB
Go
|
package innerring
|
||
|
|
||
|
import (
|
||
|
"crypto/ecdsa"
|
||
|
"crypto/elliptic"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
|
||
|
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
|
||
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||
|
irsubnet "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/subnet"
|
||
|
morphsubnet "github.com/nspcc-dev/neofs-node/pkg/morph/client/subnet"
|
||
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||
|
subnetevents "github.com/nspcc-dev/neofs-node/pkg/morph/event/subnet"
|
||
|
"github.com/nspcc-dev/neofs-node/pkg/util"
|
||
|
"github.com/nspcc-dev/neofs-sdk-go/owner"
|
||
|
"github.com/nspcc-dev/neofs-sdk-go/subnet"
|
||
|
subnetid "github.com/nspcc-dev/neofs-sdk-go/subnet/id"
|
||
|
"github.com/panjf2000/ants/v2"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// IR server's component to handle Subnet contract notifications.
|
||
|
type subnetHandler struct {
|
||
|
workerPool util.WorkerPool
|
||
|
|
||
|
morphClient morphsubnet.Client
|
||
|
|
||
|
putValidator irsubnet.PutValidator
|
||
|
|
||
|
delValidator irsubnet.DeleteValidator
|
||
|
}
|
||
|
|
||
|
// configuration of subnet component.
|
||
|
type subnetConfig struct {
|
||
|
queueSize uint32
|
||
|
}
|
||
|
|
||
|
// makes IR server to catch Subnet notifications from sidechain listener,
|
||
|
// and to release corresponding processing queue on stop.
|
||
|
func (s *Server) initSubnet(cfg subnetConfig) {
|
||
|
s.registerStarter(func() error {
|
||
|
var err error
|
||
|
|
||
|
// initialize queue for processing of the events from Subnet contract
|
||
|
s.subnetHandler.workerPool, err = ants.NewPool(int(cfg.queueSize), ants.WithNonblocking(true))
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("subnet queue initialization: %w", err)
|
||
|
}
|
||
|
|
||
|
// initialize morph client of Subnet contract
|
||
|
clientMode := morphsubnet.NotaryAlphabet
|
||
|
|
||
|
if s.sideNotaryConfig.disabled {
|
||
|
clientMode = morphsubnet.NonNotary
|
||
|
}
|
||
|
|
||
|
var initPrm morphsubnet.InitPrm
|
||
|
|
||
|
initPrm.SetBaseClient(s.morphClient)
|
||
|
initPrm.SetContractAddress(s.contracts.subnet)
|
||
|
initPrm.SetMode(clientMode)
|
||
|
|
||
|
err = s.subnetHandler.morphClient.Init(initPrm)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("init morph subnet client: %w", err)
|
||
|
}
|
||
|
|
||
|
s.listenSubnet()
|
||
|
|
||
|
return nil
|
||
|
})
|
||
|
|
||
|
s.registerCloser(func() error {
|
||
|
s.stopSubnet()
|
||
|
return nil
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// releases the Subnet contract notification processing queue.
|
||
|
func (s *Server) stopSubnet() {
|
||
|
s.workerPool.Release()
|
||
|
}
|
||
|
|
||
|
// names of listened notification events from Subnet contract.
|
||
|
const (
|
||
|
// subnet creation
|
||
|
subnetCreateEvName = "put"
|
||
|
// subnet removal
|
||
|
subnetRemoveEvName = "delete"
|
||
|
)
|
||
|
|
||
|
// makes IR server to listen notifications of Subnet contract.
|
||
|
// All required resources must be initialized before (initSubnet).
|
||
|
// Works in one of two modes (configured): notary and non-notary.
|
||
|
//
|
||
|
// All handlers are executed only if local node is an alphabet one.
|
||
|
//
|
||
|
// Events (notary):
|
||
|
// * put (parser: subnetevents.ParseNotaryPut, handler: catchSubnetCreation);
|
||
|
// * delete (parser: subnetevents.ParseNotaryDelete, handler: catchSubnetRemoval).
|
||
|
//
|
||
|
// Events (non-notary):
|
||
|
// * put (parser: subnetevents.ParsePut, handler: catchSubnetCreation);
|
||
|
// * delete (parser: subnetevents.ParseDelete, handler: catchSubnetCreation).
|
||
|
func (s *Server) listenSubnet() {
|
||
|
if s.sideNotaryConfig.disabled {
|
||
|
s.listenSubnetWithoutNotary()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
parserInfo event.NotaryParserInfo
|
||
|
handlerInfo event.NotaryHandlerInfo
|
||
|
)
|
||
|
|
||
|
parserInfo.SetScriptHash(s.contracts.subnet)
|
||
|
handlerInfo.SetScriptHash(s.contracts.subnet)
|
||
|
|
||
|
listenEvent := func(notifyName string, parser event.NotaryParser, handler event.Handler) {
|
||
|
notifyTyp := event.NotaryTypeFromString(notifyName)
|
||
|
|
||
|
parserInfo.SetMempoolType(mempoolevent.TransactionAdded)
|
||
|
handlerInfo.SetMempoolType(mempoolevent.TransactionAdded)
|
||
|
|
||
|
parserInfo.SetParser(parser)
|
||
|
handlerInfo.SetHandler(handler)
|
||
|
|
||
|
parserInfo.SetRequestType(notifyTyp)
|
||
|
handlerInfo.SetRequestType(notifyTyp)
|
||
|
|
||
|
s.morphListener.SetNotaryParser(parserInfo)
|
||
|
s.morphListener.RegisterNotaryHandler(handlerInfo)
|
||
|
}
|
||
|
|
||
|
// subnet creation
|
||
|
listenEvent(subnetCreateEvName, subnetevents.ParseNotaryPut, s.onlyAlphabetEventHandler(s.catchSubnetCreation))
|
||
|
// subnet removal
|
||
|
listenEvent(subnetRemoveEvName, subnetevents.ParseNotaryDelete, s.onlyAlphabetEventHandler(s.catchSubnetRemoval))
|
||
|
}
|
||
|
|
||
|
func (s *Server) listenSubnetWithoutNotary() {
|
||
|
var (
|
||
|
parserInfo event.NotificationParserInfo
|
||
|
handlerInfo event.NotificationHandlerInfo
|
||
|
)
|
||
|
|
||
|
parserInfo.SetScriptHash(s.contracts.subnet)
|
||
|
handlerInfo.SetScriptHash(s.contracts.subnet)
|
||
|
|
||
|
listenEvent := func(notifyName string, parser event.NotificationParser, handler event.Handler) {
|
||
|
notifyTyp := event.TypeFromString(notifyName)
|
||
|
|
||
|
parserInfo.SetType(notifyTyp)
|
||
|
handlerInfo.SetType(notifyTyp)
|
||
|
|
||
|
parserInfo.SetParser(parser)
|
||
|
handlerInfo.SetHandler(handler)
|
||
|
|
||
|
s.morphListener.SetNotificationParser(parserInfo)
|
||
|
s.morphListener.RegisterNotificationHandler(handlerInfo)
|
||
|
}
|
||
|
|
||
|
// subnet creation
|
||
|
listenEvent(subnetCreateEvName, subnetevents.ParsePut, s.onlyAlphabetEventHandler(s.catchSubnetCreation))
|
||
|
// subnet removal
|
||
|
listenEvent(subnetRemoveEvName, subnetevents.ParseDelete, s.onlyAlphabetEventHandler(s.catchSubnetRemoval))
|
||
|
}
|
||
|
|
||
|
// catchSubnetCreation catches event of subnet creation from listener and queues the processing.
|
||
|
func (s *Server) catchSubnetCreation(e event.Event) {
|
||
|
err := s.subnetHandler.workerPool.Submit(func() {
|
||
|
s.handleSubnetCreation(e)
|
||
|
})
|
||
|
if err != nil {
|
||
|
s.log.Error("subnet creation queue failure",
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// implements irsubnet.Put event interface required by irsubnet.PutValidator.
|
||
|
type putSubnetEvent struct {
|
||
|
ev subnetevents.Put
|
||
|
}
|
||
|
|
||
|
// ReadID unmarshals subnet ID from a binary NeoFS API protocol's format.
|
||
|
func (x putSubnetEvent) ReadID(id *subnetid.ID) error {
|
||
|
return id.Unmarshal(x.ev.ID())
|
||
|
}
|
||
|
|
||
|
var errMissingSubnetOwner = errors.New("missing subnet owner")
|
||
|
|
||
|
// ReadCreator unmarshals subnet creator from a binary NeoFS API protocol's format.
|
||
|
// Returns an error if byte array is empty.
|
||
|
func (x putSubnetEvent) ReadCreator(id *owner.ID) error {
|
||
|
data := x.ev.Owner()
|
||
|
|
||
|
if len(data) == 0 {
|
||
|
return errMissingSubnetOwner
|
||
|
}
|
||
|
|
||
|
key, err := keys.NewPublicKeyFromBytes(data, elliptic.P256())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
wal, err := owner.NEO3WalletFromPublicKey((*ecdsa.PublicKey)(key))
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// it would be better if we could do it not like this
|
||
|
*id = *owner.NewIDFromNeo3Wallet(wal)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// ReadInfo unmarshal subnet info from a binary NeoFS API protocol's format.
|
||
|
func (x putSubnetEvent) ReadInfo(info *subnet.Info) error {
|
||
|
return info.Unmarshal(x.ev.Info())
|
||
|
}
|
||
|
|
||
|
// handleSubnetCreation handles event of subnet creation parsed via subnetevents.ParsePut.
|
||
|
//
|
||
|
// Validates the event using irsubnet.PutValidator. Logs message about (dis)agreement.
|
||
|
func (s *Server) handleSubnetCreation(e event.Event) {
|
||
|
putEv := e.(subnetevents.Put) // panic occurs only if we registered handler incorrectly
|
||
|
|
||
|
err := s.subnetHandler.putValidator.Assert(putSubnetEvent{
|
||
|
ev: putEv,
|
||
|
})
|
||
|
if err != nil {
|
||
|
s.log.Info("discard subnet creation",
|
||
|
zap.String("reason", err.Error()),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
notaryMainTx := putEv.NotaryMainTx()
|
||
|
|
||
|
isNotary := notaryMainTx != nil
|
||
|
if isNotary {
|
||
|
// re-sign notary request
|
||
|
err = s.morphClient.NotarySignAndInvokeTX(notaryMainTx)
|
||
|
} else {
|
||
|
// send new transaction
|
||
|
var prm morphsubnet.PutPrm
|
||
|
|
||
|
prm.SetID(putEv.ID())
|
||
|
prm.SetOwner(putEv.Owner())
|
||
|
prm.SetInfo(putEv.Info())
|
||
|
prm.SetTxHash(putEv.TxHash())
|
||
|
|
||
|
_, err = s.subnetHandler.morphClient.Put(prm)
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
s.log.Error("approve subnet creation",
|
||
|
zap.Bool("notary", isNotary),
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// catchSubnetRemoval catches event of subnet removal from listener and queues the processing.
|
||
|
func (s *Server) catchSubnetRemoval(e event.Event) {
|
||
|
err := s.subnetHandler.workerPool.Submit(func() {
|
||
|
s.handleSubnetCreation(e)
|
||
|
})
|
||
|
if err != nil {
|
||
|
s.log.Error("subnet removal queue failure",
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// implements irsubnet.Delete event interface required by irsubnet.DeleteValidator.
|
||
|
type deleteSubnetEvent struct {
|
||
|
ev subnetevents.Delete
|
||
|
}
|
||
|
|
||
|
// ReadID unmarshals subnet ID from a binary NeoFS API protocol's format.
|
||
|
func (x deleteSubnetEvent) ReadID(id *subnetid.ID) error {
|
||
|
return id.Unmarshal(x.ev.ID())
|
||
|
}
|
||
|
|
||
|
// handleSubnetRemoval handles event of subnet removal parsed via subnetevents.ParseDelete.
|
||
|
func (s *Server) handleSubnetRemoval(e event.Event) {
|
||
|
delEv := e.(subnetevents.Delete) // panic occurs only if we registered handler incorrectly
|
||
|
|
||
|
err := s.subnetHandler.delValidator.Assert(deleteSubnetEvent{
|
||
|
ev: delEv,
|
||
|
})
|
||
|
if err != nil {
|
||
|
s.log.Info("discard subnet removal",
|
||
|
zap.String("reason", err.Error()),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
notaryMainTx := delEv.NotaryMainTx()
|
||
|
|
||
|
isNotary := notaryMainTx != nil
|
||
|
if isNotary {
|
||
|
// re-sign notary request
|
||
|
err = s.morphClient.NotarySignAndInvokeTX(notaryMainTx)
|
||
|
} else {
|
||
|
// send new transaction
|
||
|
var prm morphsubnet.DeletePrm
|
||
|
|
||
|
prm.SetID(delEv.ID())
|
||
|
prm.SetTxHash(delEv.TxHash())
|
||
|
|
||
|
_, err = s.subnetHandler.morphClient.Delete(prm)
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
s.log.Error("approve subnet removal",
|
||
|
zap.Bool("notary", isNotary),
|
||
|
zap.String("error", err.Error()),
|
||
|
)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// TODO: handle removal of the subnet in netmap candidates
|
||
|
}
|