frostfs-node/pkg/innerring/innerring.go
Alex Vanin 3aae60d517 [#454] innerring: Increase duration of notary deposit
Extra blocks for notary deposit must not be less than extra blocks
at notary tx rounding.

Consider you make notary deposit every 1000 block for next
1100 blocks. At block 555 you made notary deposit up to 1655.

At block 1554 you want to send notary tx. Notary client uses rounding
to calculate `until` value. By default notary client rounds with up
to 150 block ahead, thus for tx at 1554 `until` will be 1700.

1700 is bigger than deposit limit at 1655 and tx will fail. However
if extra blocks for notary deposit will be 200, then this case
won't be possible.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2021-04-02 11:26:37 +03:00

804 lines
22 KiB
Go

package innerring
import (
"context"
"crypto/ecdsa"
"io"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/util"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/pkg/innerring/config"
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/balance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement"
auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
auditWrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/precision"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type (
// Server is the inner ring application structure, that contains all event
// processors, shared variables and event handlers.
Server struct {
log *zap.Logger
// event producers
morphListener event.Listener
mainnetListener event.Listener
blockTimers []*timers.BlockTimer
epochTimer *timers.BlockTimer
// global state
morphClient *client.Client
mainnetClient *client.Client
epochCounter atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
auditClient *auditWrapper.ClientWrapper
notaryDepositAmount fixedn.Fixed8
notaryDuration uint32
// internal variables
key *ecdsa.PrivateKey
pubKey []byte
contracts *contracts
predefinedValidators keys.PublicKeys
workers []func(context.Context)
// Set of local resources that must be
// initialized at the very beginning of
// Server's work, (e.g. opening files).
//
// If any starter returns an error, Server's
// starting fails immediately.
starters []func() error
// Set of local resources that must be
// released at Server's work completion
// (e.g closing files).
//
// Closer's wrong outcome shouldn't be critical.
//
// Errors are logged.
closers []func() error
}
contracts struct {
neofs util.Uint160 // in mainnet
netmap util.Uint160 // in morph
balance util.Uint160 // in morph
container util.Uint160 // in morph
audit util.Uint160 // in morph
proxy util.Uint160 // in morph
alphabet alphabetContracts // in morph
}
chainParams struct {
log *zap.Logger
cfg *viper.Viper
key *ecdsa.PrivateKey
name string
gas util.Uint160
}
)
const (
morphPrefix = "morph"
mainnetPrefix = "mainnet"
// extra blocks to overlap two deposits, we do that to make sure that
// there won't be any blocks without deposited assets in notary contract;
// make sure it is bigger than any extra rounding value in notary client.
notaryExtraBlocks = 300
// amount of tries before notary deposit timeout.
notaryDepositTimeout = 100
)
var (
errDepositTimeout = errors.New("notary deposit didn't appeared in the network")
)
// Start runs all event providers.
func (s *Server) Start(ctx context.Context, intError chan<- error) error {
for _, starter := range s.starters {
if err := starter(); err != nil {
return err
}
}
err := s.initConfigFromBlockchain()
if err != nil {
return err
}
// make an initial deposit to notary contract to enable it
err = s.depositNotary()
if err != nil {
return err
}
// wait a bit for notary contract deposit
s.log.Info("waiting to accept notary deposit")
err = s.awaitNotaryDeposit(ctx)
if err != nil {
return err
}
// vote for sidechain validator if it is prepared in config
err = s.voteForSidechainValidator(s.predefinedValidators)
if err != nil {
// we don't stop inner ring execution on this error
s.log.Warn("can't vote for prepared validators",
zap.String("error", err.Error()))
}
morphErr := make(chan error)
mainnnetErr := make(chan error)
// anonymous function to multiplex error channels
go func() {
select {
case <-ctx.Done():
return
case err := <-morphErr:
intError <- errors.Wrap(err, "sidechain")
case err := <-mainnnetErr:
intError <- errors.Wrap(err, "mainnet")
}
}()
s.morphListener.RegisterBlockHandler(func(b *block.Block) {
s.log.Debug("new block",
zap.Uint32("index", b.Index),
)
s.tickTimers()
})
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events
if err := s.startBlockTimers(); err != nil {
return errors.Wrap(err, "could not start block timers")
}
s.startWorkers(ctx)
return nil
}
func (s *Server) startWorkers(ctx context.Context) {
for _, w := range s.workers {
go w(ctx)
}
}
// Stop closes all subscription channels.
func (s *Server) Stop() {
go s.morphListener.Stop()
go s.mainnetListener.Stop()
for _, c := range s.closers {
if err := c(); err != nil {
s.log.Warn("closer error",
zap.String("error", err.Error()),
)
}
}
}
func (s *Server) registerIOCloser(c io.Closer) {
s.registerCloser(c.Close)
}
func (s *Server) registerCloser(f func() error) {
s.closers = append(s.closers, f)
}
func (s *Server) registerStarter(f func() error) {
s.starters = append(s.starters, f)
}
// New creates instance of inner ring sever structure.
func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error) {
var err error
server := &Server{log: log}
// prepare inner ring node private key
server.key, err = crypto.LoadPrivateKey(cfg.GetString("key"))
if err != nil {
return nil, errors.Wrap(err, "ir: can't create private key")
}
// get all script hashes of contracts
server.contracts, err = parseContracts(cfg)
if err != nil {
return nil, err
}
// parse default validators
server.predefinedValidators, err = parsePredefinedValidators(cfg)
if err != nil {
return nil, errors.Wrap(err, "ir: can't parse predefined validators list")
}
morphChain := &chainParams{
log: log,
cfg: cfg,
key: server.key,
name: morphPrefix,
}
// create morph listener
server.morphListener, err = createListener(ctx, morphChain)
if err != nil {
return nil, err
}
// create morph client
server.morphClient, err = createClient(ctx, morphChain)
if err != nil {
return nil, err
}
err = server.morphClient.EnableNotarySupport(
server.contracts.proxy,
server.contracts.netmap,
)
if err != nil {
return nil, err
}
if cfg.GetBool("without_mainnet") {
// This works as long as event Listener starts listening loop once,
// otherwise Server.Start will run two similar routines.
// This behavior most likely will not change.
server.mainnetListener = server.morphListener
server.mainnetClient = server.morphClient
} else {
mainnetChain := morphChain
mainnetChain.name = mainnetPrefix
// create mainnet listener
server.mainnetListener, err = createListener(ctx, mainnetChain)
if err != nil {
return nil, err
}
// create mainnet client
server.mainnetClient, err = createClient(ctx, mainnetChain)
if err != nil {
return nil, err
}
}
server.pubKey = crypto.MarshalPublicKey(&server.key.PublicKey)
server.statusIndex = newInnerRingIndexer(
server.morphClient,
&server.key.PublicKey,
cfg.GetDuration("indexer.cache_timeout"),
)
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
if err != nil {
return nil, err
}
server.auditClient, err = invoke.NewAuditClient(server.morphClient, server.contracts.audit)
if err != nil {
return nil, err
}
cnrClient, err := invoke.NewContainerClient(server.morphClient, server.contracts.container)
if err != nil {
return nil, err
}
nmClient, err := invoke.NewNetmapClient(server.morphClient, server.contracts.netmap)
if err != nil {
return nil, err
}
balClient, err := invoke.NewBalanceClient(server.morphClient, server.contracts.balance)
if err != nil {
return nil, err
}
// create global runtime config reader
globalConfig := config.NewGlobalConfigReader(cfg, nmClient)
clientCache := newClientCache(&clientCacheParams{
Log: log,
Key: server.key,
SGTimeout: cfg.GetDuration("audit.timeout.get"),
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
})
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pool_size")
// create audit processor dependencies
auditTaskManager := audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool),
audittask.WithLogger(log),
audittask.WithContainerCommunicator(clientCache),
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(pdpPoolSize)
}),
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(porPoolSize)
}),
)
server.workers = append(server.workers, auditTaskManager.Listen)
// create audit processor
auditProcessor, err := audit.New(&audit.Params{
Log: log,
NetmapContract: server.contracts.netmap,
ContainerContract: server.contracts.container,
AuditContract: server.contracts.audit,
MorphClient: server.morphClient,
IRList: server,
ClientCache: clientCache,
Key: server.key,
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
TaskManager: auditTaskManager,
Reporter: server,
})
if err != nil {
return nil, err
}
// create settlement processor dependencies
settlementDeps := &settlementDeps{
log: server.log,
cnrSrc: cnrClient,
auditClient: server.auditClient,
nmSrc: nmClient,
clientCache: clientCache,
balanceClient: balClient,
}
auditCalcDeps := &auditSettlementDeps{
settlementDeps: settlementDeps,
}
basicSettlementDeps := &basicIncomeSettlementDeps{
settlementDeps: settlementDeps,
cnrClient: cnrClient,
cfg: globalConfig,
}
auditSettlementCalc := auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
},
auditSettlement.WithLogger(server.log),
)
// create settlement processor
settlementProcessor := settlement.New(
settlement.Prm{
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
State: server,
},
settlement.WithLogger(server.log),
)
locodeValidator, err := server.newLocodeValidator(cfg)
if err != nil {
return nil, err
}
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: log,
NeoFSContract: server.contracts.neofs,
AlphabetState: server,
EpochState: server,
Voter: server,
MorphClient: server.morphClient,
MainnetClient: server.mainnetClient,
})
if err != nil {
return nil, err
}
// create netmap processor
netmapProcessor, err := netmap.New(&netmap.Params{
Log: log,
PoolSize: cfg.GetInt("workers.netmap"),
NetmapContract: server.contracts.netmap,
EpochTimer: server,
MorphClient: server.morphClient,
EpochState: server,
AlphabetState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient,
HandleAudit: server.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(),
),
AuditSettlementsHandler: server.onlyAlphabetEventHandler(
settlementProcessor.HandleAuditEvent,
),
AlphabetSyncHandler: governanceProcessor.HandleAlphabetSync,
NodeValidator: locodeValidator,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(netmapProcessor, server)
if err != nil {
return nil, err
}
// container processor
containerProcessor, err := container.New(&container.Params{
Log: log,
PoolSize: cfg.GetInt("workers.container"),
ContainerContract: server.contracts.container,
MorphClient: server.morphClient,
AlphabetState: server,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(containerProcessor, server)
if err != nil {
return nil, err
}
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: log,
PoolSize: cfg.GetInt("workers.balance"),
NeoFSContract: server.contracts.neofs,
BalanceContract: server.contracts.balance,
MainnetClient: server.mainnetClient,
AlphabetState: server,
Converter: &server.precision,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(balanceProcessor, server)
if err != nil {
return nil, err
}
// todo: create reputation processor
// create mainnnet neofs processor
neofsProcessor, err := neofs.New(&neofs.Params{
Log: log,
PoolSize: cfg.GetInt("workers.neofs"),
NeoFSContract: server.contracts.neofs,
BalanceContract: server.contracts.balance,
NetmapContract: server.contracts.netmap,
MorphClient: server.morphClient,
EpochState: server,
AlphabetState: server,
Converter: &server.precision,
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
})
if err != nil {
return nil, err
}
err = bindMainnetProcessor(neofsProcessor, server)
if err != nil {
return nil, err
}
// create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{
Log: log,
PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: server.contracts.alphabet,
NetmapContract: server.contracts.netmap,
MorphClient: server.morphClient,
IRList: server,
StorageEmission: cfg.GetUint64("emit.storage.amount"),
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(alphabetProcessor, server)
if err != nil {
return nil, err
}
// todo: create vivid id component
// initialize epoch timers
server.epochTimer = newEpochTimer(&epochTimerArgs{
l: server.log,
nm: netmapProcessor,
cnrWrapper: cnrClient,
epoch: server,
epochDuration: cfg.GetUint32("timers.epoch"),
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
collectBasicIncome: subEpochEventHandler{
handler: settlementProcessor.HandleIncomeCollectionEvent,
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
},
distributeBasicIncome: subEpochEventHandler{
handler: settlementProcessor.HandleIncomeDistributionEvent,
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
},
})
server.addBlockTimer(server.epochTimer)
// initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"),
})
server.addBlockTimer(emissionTimer)
// initialize notary deposit timer
server.notaryDepositAmount = fixedn.Fixed8(cfg.GetInt64("notary.deposit_amount"))
server.notaryDuration = cfg.GetUint32("timers.notary")
notaryTimer := newNotaryDepositTimer(&notaryDepositArgs{
l: log,
depositor: server.depositNotary,
notaryDuration: server.notaryDuration,
})
server.addBlockTimer(notaryTimer)
return server, nil
}
func createListener(ctx context.Context, p *chainParams) (event.Listener, error) {
sub, err := subscriber.New(ctx, &subscriber.Params{
Log: p.log,
Endpoint: p.cfg.GetString(p.name + ".endpoint.notification"),
DialTimeout: p.cfg.GetDuration(p.name + ".dial_timeout"),
})
if err != nil {
return nil, err
}
listener, err := event.NewListener(event.ListenerParams{
Logger: p.log,
Subscriber: sub,
})
if err != nil {
return nil, err
}
return listener, err
}
func createClient(ctx context.Context, p *chainParams) (*client.Client, error) {
return client.New(
p.key,
p.cfg.GetString(p.name+".endpoint.client"),
client.WithContext(ctx),
client.WithLogger(p.log),
client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")),
)
}
func parseContracts(cfg *viper.Viper) (*contracts, error) {
var (
result = new(contracts)
err error
)
netmapContractStr := cfg.GetString("contracts.netmap")
neofsContractStr := cfg.GetString("contracts.neofs")
balanceContractStr := cfg.GetString("contracts.balance")
containerContractStr := cfg.GetString("contracts.container")
auditContractStr := cfg.GetString("contracts.audit")
proxyContractStr := cfg.GetString("contracts.proxy")
result.netmap, err = util.Uint160DecodeStringLE(netmapContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read netmap script-hash")
}
result.neofs, err = util.Uint160DecodeStringLE(neofsContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read neofs script-hash")
}
result.balance, err = util.Uint160DecodeStringLE(balanceContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read balance script-hash")
}
result.container, err = util.Uint160DecodeStringLE(containerContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read container script-hash")
}
result.audit, err = util.Uint160DecodeStringLE(auditContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read audit script-hash")
}
result.proxy, err = util.Uint160DecodeStringLE(proxyContractStr)
if err != nil {
return nil, errors.Wrap(err, "ir: can't read proxy script-hash")
}
result.alphabet, err = parseAlphabetContracts(cfg)
if err != nil {
return nil, err
}
return result, nil
}
func parsePredefinedValidators(cfg *viper.Viper) (keys.PublicKeys, error) {
publicKeyStrings := cfg.GetStringSlice("morph.validators")
return ParsePublicKeysFromStrings(publicKeyStrings)
}
// ParsePublicKeysFromStrings returns slice of neo public keys from slice
// of hex encoded strings.
func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
publicKeys := make(keys.PublicKeys, 0, len(pubKeys))
for i := range pubKeys {
key, err := keys.NewPublicKeyFromString(pubKeys[i])
if err != nil {
return nil, errors.Wrap(err, "can't decode public key")
}
publicKeys = append(publicKeys, key)
}
return publicKeys, nil
}
func parseAlphabetContracts(cfg *viper.Viper) (alphabetContracts, error) {
num := glagoliticLetter(cfg.GetUint("contracts.alphabet.amount"))
alpha := newAlphabetContracts()
if num > lastLetterNum {
return nil, errors.Errorf("amount of alphabet contracts overflows glagolitsa %d > %d", num, lastLetterNum)
}
for letter := az; letter < num; letter++ {
contractStr := cfg.GetString("contracts.alphabet." + letter.configString())
contractHash, err := util.Uint160DecodeStringLE(contractStr)
if err != nil {
return nil, errors.Wrapf(err, "invalid alphabet %s contract: %s", letter.configString(), contractStr)
}
alpha.set(letter, contractHash)
}
return alpha, nil
}
func (s *Server) initConfigFromBlockchain() error {
// get current epoch
epoch, err := invoke.Epoch(s.morphClient, s.contracts.netmap)
if err != nil {
return errors.Wrap(err, "can't read epoch")
}
// get balance precision
balancePrecision, err := invoke.BalancePrecision(s.morphClient, s.contracts.balance)
if err != nil {
return errors.Wrap(err, "can't read balance contract precision")
}
s.epochCounter.Store(uint64(epoch))
s.precision.SetBalancePrecision(balancePrecision)
s.log.Debug("read config from blockchain",
zap.Bool("active", s.IsActive()),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Int64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
)
return nil
}
// onlyActiveHandler wrapper around event handler that executes it
// only if inner ring node state is active.
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsActive() {
f(ev)
}
}
}
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsAlphabet() {
f(ev)
}
}
}
func (s *Server) depositNotary() error {
return s.morphClient.DepositNotary(
s.notaryDepositAmount,
s.notaryDuration+notaryExtraBlocks,
)
}
func (s *Server) awaitNotaryDeposit(ctx context.Context) error {
for i := 0; i < notaryDepositTimeout; i++ {
select {
case <-ctx.Done():
return nil
default:
}
deposit, err := s.morphClient.GetNotaryDeposit()
if err != nil {
return errors.Wrap(err, "can't get notary deposit")
}
if deposit > 0 {
return nil
}
s.log.Info("empty notary deposit, waiting one more block")
s.morphClient.Wait(ctx, 1)
}
return errDepositTimeout
}