frostfs-node/pkg/innerring/processors/netmap/processor.go
Alejandro Lopez ebcc8afbee [] Add inner-ring event metrics
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-31 10:11:48 +00:00

255 lines
7.4 KiB
Go

package netmap
import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// EpochTimerReseter is a callback interface for tickers component.
EpochTimerReseter interface {
ResetEpochTimer(uint32) error
}
// EpochState is a callback interface for inner ring global state.
EpochState interface {
SetEpochCounter(uint64)
EpochCounter() uint64
SetEpochDuration(uint64)
EpochDuration() uint64
}
// AlphabetState is a callback interface for inner ring global state.
AlphabetState interface {
IsAlphabet() bool
}
// NodeValidator wraps basic method of checking the correctness
// of information about the node and its finalization for adding
// to the network map.
NodeValidator interface {
// Must verify and optionally update NodeInfo structure.
//
// Must return an error if NodeInfo input is invalid.
// Must return an error if it is not possible to correctly
// change the structure for sending to the network map.
//
// If no error occurs, the parameter must point to the
// ready-made NodeInfo structure.
VerifyAndUpdate(*netmap.NodeInfo) error
}
Client interface {
MorphNotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error
ContractAddress() util.Uint160
EpochDuration() (uint64, error)
MorphTxHeight(h util.Uint256) (res uint32, err error)
NetMap() (*netmap.NetMap, error)
NewEpoch(epoch uint64, force bool) error
MorphIsValidScript(script []byte, signers []transaction.Signer) (valid bool, err error)
MorphNotarySignAndInvokeTX(mainTx *transaction.Transaction) error
}
ContainerClient interface {
StartEstimation(p cntClient.StartEstimationPrm) error
}
// Processor of events produced by network map contract
// and new epoch ticker, because it is related to contract.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
epochTimer EpochTimerReseter
epochState EpochState
alphabetState AlphabetState
netmapClient Client
containerWrp ContainerClient
netmapSnapshot cleanupTable
handleAlphabetSync event.Handler
handleNotaryDeposit event.Handler
nodeValidator NodeValidator
nodeStateSettings state.NetworkSettings
}
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
NetmapClient Client
EpochTimer EpochTimerReseter
EpochState EpochState
AlphabetState AlphabetState
CleanupEnabled bool
CleanupThreshold uint64 // in epochs
ContainerWrapper ContainerClient
AlphabetSyncHandler event.Handler
NotaryDepositHandler event.Handler
NodeValidator NodeValidator
NodeStateSettings state.NetworkSettings
}
)
const (
newEpochNotification = "NewEpoch"
)
// New creates network map contract processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/netmap: logger is not set")
case p.EpochTimer == nil:
return nil, errors.New("ir/netmap: epoch itmer is not set")
case p.EpochState == nil:
return nil, errors.New("ir/netmap: global state is not set")
case p.AlphabetState == nil:
return nil, errors.New("ir/netmap: global state is not set")
case p.AlphabetSyncHandler == nil:
return nil, errors.New("ir/netmap: alphabet sync handler is not set")
case p.NotaryDepositHandler == nil:
return nil, errors.New("ir/netmap: notary deposit handler is not set")
case p.ContainerWrapper == nil:
return nil, errors.New("ir/netmap: container contract wrapper is not set")
case p.NodeValidator == nil:
return nil, errors.New("ir/netmap: node validator is not set")
case p.NodeStateSettings == nil:
return nil, errors.New("ir/netmap: node state settings is not set")
}
p.Log.Debug(logs.NetmapNetmapWorkerPool, zap.Int("size", p.PoolSize))
pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
epochTimer: p.EpochTimer,
epochState: p.EpochState,
alphabetState: p.AlphabetState,
netmapClient: p.NetmapClient,
containerWrp: p.ContainerWrapper,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),
handleAlphabetSync: p.AlphabetSyncHandler,
handleNotaryDeposit: p.NotaryDepositHandler,
nodeValidator: p.NodeValidator,
nodeStateSettings: p.NodeStateSettings,
}, nil
}
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (np *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
parsers := make([]event.NotificationParserInfo, 0, 3)
var p event.NotificationParserInfo
p.SetScriptHash(np.netmapClient.ContractAddress())
// new epoch event
p.SetType(newEpochNotification)
p.SetParser(netmapEvent.ParseNewEpoch)
parsers = append(parsers, p)
return parsers
}
// ListenerNotificationHandlers for the 'event.Listener' event producer.
func (np *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
handlers := make([]event.NotificationHandlerInfo, 0, 3)
var i event.NotificationHandlerInfo
i.SetScriptHash(np.netmapClient.ContractAddress())
// new epoch handler
i.SetType(newEpochNotification)
i.SetHandler(np.handleNewEpoch)
handlers = append(handlers, i)
return handlers
}
// ListenerNotaryParsers for the 'event.Listener' event producer.
func (np *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
var (
p event.NotaryParserInfo
pp = make([]event.NotaryParserInfo, 0, 2)
)
p.SetMempoolType(mempoolevent.TransactionAdded)
p.SetScriptHash(np.netmapClient.ContractAddress())
// new peer
p.SetRequestType(netmapEvent.AddPeerNotaryEvent)
p.SetParser(netmapEvent.ParseAddPeerNotary)
pp = append(pp, p)
// update state
p.SetRequestType(netmapEvent.UpdateStateNotaryEvent)
p.SetParser(netmapEvent.ParseUpdatePeerNotary)
pp = append(pp, p)
return pp
}
// ListenerNotaryHandlers for the 'event.Listener' event producer.
func (np *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
var (
h event.NotaryHandlerInfo
hh = make([]event.NotaryHandlerInfo, 0, 2)
)
h.SetMempoolType(mempoolevent.TransactionAdded)
h.SetScriptHash(np.netmapClient.ContractAddress())
// new peer
h.SetRequestType(netmapEvent.AddPeerNotaryEvent)
h.SetHandler(np.handleAddPeer)
hh = append(hh, h)
// update state
h.SetRequestType(netmapEvent.UpdateStateNotaryEvent)
h.SetHandler(np.handleUpdateState)
hh = append(hh, h)
return hh
}