forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
0948a280fa
With non-blocking pool restricted by 10 in capacity, the probability of dropping events is unexpectedly big. Notifications are an essential part of the FrostFS, we should not drop anything, especially new epochs. ``` Mar 31 07:07:03 vedi neofs-ir[19164]: 2023-03-31T07:07:03.901Z debug subscriber/subscriber.go:154 new notification event from sidechain {"name": "NewEpoch"} Mar 31 07:07:03 vedi neofs-ir[19164]: 2023-03-31T07:07:03.901Z warn event/listener.go:248 listener worker pool drained {"chain": "morph", "capacity": 10} ``` Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
301 lines
8.2 KiB
Go
301 lines
8.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/rand"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
newEpochNotification = "NewEpoch"
|
|
|
|
// amount of tries(blocks) before notary deposit timeout.
|
|
notaryDepositRetriesAmount = 300
|
|
)
|
|
|
|
func initMorphComponents(c *cfg) {
|
|
var err error
|
|
|
|
addresses := morphconfig.RPCEndpoint(c.appCfg)
|
|
|
|
// Morph client stable-sorts endpoints by priority. Shuffle here to randomize
|
|
// order of endpoints with the same priority.
|
|
rand.Shuffle(len(addresses), func(i, j int) {
|
|
addresses[i], addresses[j] = addresses[j], addresses[i]
|
|
})
|
|
|
|
cli, err := client.New(c.key,
|
|
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
|
client.WithLogger(c.log),
|
|
client.WithEndpoints(addresses...),
|
|
client.WithConnLostCallback(func() {
|
|
c.internalErr <- errors.New("morph connection has been lost")
|
|
}),
|
|
client.WithSwitchInterval(morphconfig.SwitchInterval(c.appCfg)),
|
|
)
|
|
if err != nil {
|
|
c.log.Info("failed to create neo RPC client",
|
|
zap.Any("endpoints", addresses),
|
|
zap.String("error", err.Error()),
|
|
)
|
|
|
|
fatalOnErr(err)
|
|
}
|
|
|
|
c.onShutdown(func() {
|
|
c.log.Info("closing morph components...")
|
|
cli.Close()
|
|
})
|
|
|
|
if err := cli.SetGroupSignerScope(); err != nil {
|
|
c.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
|
}
|
|
|
|
c.cfgMorph.client = cli
|
|
c.cfgMorph.notaryEnabled = cli.ProbeNotary()
|
|
|
|
lookupScriptHashesInNNS(c) // smart contract auto negotiation
|
|
|
|
if c.cfgMorph.notaryEnabled {
|
|
err = c.cfgMorph.client.EnableNotarySupport(
|
|
client.WithProxyContract(
|
|
c.cfgMorph.proxyScriptHash,
|
|
),
|
|
)
|
|
fatalOnErr(err)
|
|
}
|
|
|
|
c.log.Info("notary support",
|
|
zap.Bool("sidechain_enabled", c.cfgMorph.notaryEnabled),
|
|
)
|
|
|
|
wrap, err := nmClient.NewFromMorph(c.cfgMorph.client, c.cfgNetmap.scriptHash, 0, nmClient.TryNotary())
|
|
fatalOnErr(err)
|
|
|
|
var netmapSource netmap.Source
|
|
|
|
c.cfgMorph.cacheTTL = morphconfig.CacheTTL(c.appCfg)
|
|
|
|
if c.cfgMorph.cacheTTL == 0 {
|
|
msPerBlock, err := c.cfgMorph.client.MsPerBlock()
|
|
fatalOnErr(err)
|
|
c.cfgMorph.cacheTTL = time.Duration(msPerBlock) * time.Millisecond
|
|
c.log.Debug("morph.cache_ttl fetched from network", zap.Duration("value", c.cfgMorph.cacheTTL))
|
|
}
|
|
|
|
if c.cfgMorph.cacheTTL < 0 {
|
|
netmapSource = wrap
|
|
} else {
|
|
// use RPC node as source of netmap (with caching)
|
|
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
|
|
}
|
|
|
|
c.netMapSource = netmapSource
|
|
c.cfgNetmap.wrapper = wrap
|
|
}
|
|
|
|
func makeAndWaitNotaryDeposit(ctx context.Context, c *cfg) {
|
|
// skip notary deposit in non-notary environments
|
|
if !c.cfgMorph.notaryEnabled {
|
|
return
|
|
}
|
|
|
|
tx, err := makeNotaryDeposit(c)
|
|
fatalOnErr(err)
|
|
|
|
if tx.Equals(util.Uint256{}) {
|
|
// non-error deposit with an empty TX hash means
|
|
// that the deposit has already been made; no
|
|
// need to wait it.
|
|
c.log.Info("notary deposit has already been made")
|
|
return
|
|
}
|
|
|
|
err = waitNotaryDeposit(ctx, c, tx)
|
|
fatalOnErr(err)
|
|
}
|
|
|
|
func makeNotaryDeposit(c *cfg) (util.Uint256, error) {
|
|
const (
|
|
// gasMultiplier defines how many times more the notary
|
|
// balance must be compared to the GAS balance of the node:
|
|
// notaryBalance = GASBalance * gasMultiplier
|
|
gasMultiplier = 3
|
|
|
|
// gasDivisor defines what part of GAS balance (1/gasDivisor)
|
|
// should be transferred to the notary service
|
|
gasDivisor = 2
|
|
)
|
|
|
|
depositAmount, err := client.CalculateNotaryDepositAmount(c.cfgMorph.client, gasMultiplier, gasDivisor)
|
|
if err != nil {
|
|
return util.Uint256{}, fmt.Errorf("could not calculate notary deposit: %w", err)
|
|
}
|
|
|
|
return c.cfgMorph.client.DepositEndlessNotary(depositAmount)
|
|
}
|
|
|
|
var (
|
|
errNotaryDepositFail = errors.New("notary deposit tx has faulted")
|
|
errNotaryDepositTimeout = errors.New("notary deposit tx has not appeared in the network")
|
|
)
|
|
|
|
func waitNotaryDeposit(ctx context.Context, c *cfg, tx util.Uint256) error {
|
|
for i := 0; i < notaryDepositRetriesAmount; i++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
ok, err := c.cfgMorph.client.TxHalt(tx)
|
|
if err == nil {
|
|
if ok {
|
|
return nil
|
|
}
|
|
|
|
return errNotaryDepositFail
|
|
}
|
|
|
|
err = c.cfgMorph.client.Wait(ctx, 1)
|
|
if err != nil {
|
|
return fmt.Errorf("could not wait for one block in chain: %w", err)
|
|
}
|
|
}
|
|
|
|
return errNotaryDepositTimeout
|
|
}
|
|
|
|
func listenMorphNotifications(ctx context.Context, c *cfg) {
|
|
var (
|
|
err error
|
|
subs subscriber.Subscriber
|
|
)
|
|
|
|
fromSideChainBlock, err := c.persistate.UInt32(persistateSideChainLastBlockKey)
|
|
if err != nil {
|
|
fromSideChainBlock = 0
|
|
c.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
|
|
}
|
|
|
|
subs, err = subscriber.New(ctx, &subscriber.Params{
|
|
Log: c.log,
|
|
StartFromBlock: fromSideChainBlock,
|
|
Client: c.cfgMorph.client,
|
|
})
|
|
fatalOnErr(err)
|
|
|
|
lis, err := event.NewListener(event.ListenerParams{
|
|
Logger: c.log,
|
|
Subscriber: subs,
|
|
})
|
|
fatalOnErr(err)
|
|
|
|
c.workers = append(c.workers, newWorkerFromFunc(func(wCtx context.Context) {
|
|
runAndLog(wCtx, c, "morph notification", false, func(lCtx context.Context, c *cfg) {
|
|
lis.ListenWithError(lCtx, c.internalErr)
|
|
})
|
|
}))
|
|
|
|
setNetmapNotificationParser(c, newEpochNotification, func(src *state.ContainedNotificationEvent) (event.Event, error) {
|
|
res, err := netmapEvent.ParseNewEpoch(src)
|
|
if err == nil {
|
|
c.log.Info("new epoch event from sidechain",
|
|
zap.Uint64("number", res.(netmapEvent.NewEpoch).EpochNumber()),
|
|
)
|
|
}
|
|
|
|
return res, err
|
|
})
|
|
registerNotificationHandlers(c.cfgNetmap.scriptHash, lis, c.cfgNetmap.parsers, c.cfgNetmap.subscribers)
|
|
registerNotificationHandlers(c.cfgContainer.scriptHash, lis, c.cfgContainer.parsers, c.cfgContainer.subscribers)
|
|
|
|
registerBlockHandler(lis, func(block *block.Block) {
|
|
c.log.Debug("new block", zap.Uint32("index", block.Index))
|
|
|
|
err = c.persistate.SetUInt32(persistateSideChainLastBlockKey, block.Index)
|
|
if err != nil {
|
|
c.log.Warn("can't update persistent state",
|
|
zap.String("chain", "side"),
|
|
zap.Uint32("block_index", block.Index))
|
|
}
|
|
|
|
tickBlockTimers(c)
|
|
})
|
|
}
|
|
|
|
func registerNotificationHandlers(scHash util.Uint160, lis event.Listener, parsers map[event.Type]event.NotificationParser,
|
|
subs map[event.Type][]event.Handler) {
|
|
for typ, handlers := range subs {
|
|
pi := event.NotificationParserInfo{}
|
|
pi.SetType(typ)
|
|
pi.SetScriptHash(scHash)
|
|
|
|
p, ok := parsers[typ]
|
|
if !ok {
|
|
panic(fmt.Sprintf("missing parser for event %s", typ))
|
|
}
|
|
|
|
pi.SetParser(p)
|
|
|
|
lis.SetNotificationParser(pi)
|
|
|
|
for _, h := range handlers {
|
|
hi := event.NotificationHandlerInfo{}
|
|
hi.SetType(typ)
|
|
hi.SetScriptHash(scHash)
|
|
hi.SetHandler(h)
|
|
|
|
lis.RegisterNotificationHandler(hi)
|
|
}
|
|
}
|
|
}
|
|
|
|
func registerBlockHandler(lis event.Listener, handler event.BlockHandler) {
|
|
lis.RegisterBlockHandler(handler)
|
|
}
|
|
|
|
// lookupScriptHashesInNNS looks up for contract script hashes in NNS contract of side
|
|
// chain if they were not specified in config file.
|
|
func lookupScriptHashesInNNS(c *cfg) {
|
|
var (
|
|
err error
|
|
|
|
emptyHash = util.Uint160{}
|
|
targets = [...]struct {
|
|
h *util.Uint160
|
|
nnsName string
|
|
}{
|
|
{&c.cfgNetmap.scriptHash, client.NNSNetmapContractName},
|
|
{&c.cfgAccounting.scriptHash, client.NNSBalanceContractName},
|
|
{&c.cfgContainer.scriptHash, client.NNSContainerContractName},
|
|
{&c.cfgReputation.scriptHash, client.NNSReputationContractName},
|
|
{&c.cfgMorph.proxyScriptHash, client.NNSProxyContractName},
|
|
}
|
|
)
|
|
|
|
for _, t := range targets {
|
|
if t.nnsName == client.NNSProxyContractName && !c.cfgMorph.notaryEnabled {
|
|
continue // ignore proxy contract if notary disabled
|
|
}
|
|
|
|
if emptyHash.Equals(*t.h) {
|
|
*t.h, err = c.cfgMorph.client.NNSContractAddress(t.nnsName)
|
|
fatalOnErrDetails(fmt.Sprintf("can't resolve %s in NNS", t.nnsName), err)
|
|
}
|
|
}
|
|
}
|