From d0d1731af75e1a4335021aaedf509f57ddc9b5ce Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 25 Mar 2021 16:23:53 +0300 Subject: [PATCH] [#447] innerring: Define governance processor Signed-off-by: Alex Vanin --- pkg/innerring/processors/governance/events.go | 11 ++ .../processors/governance/handlers.go | 19 +++ .../processors/governance/process_update.go | 98 ++++++++++++++++ .../processors/governance/processor.go | 109 ++++++++++++++++++ 4 files changed, 237 insertions(+) create mode 100644 pkg/innerring/processors/governance/events.go create mode 100644 pkg/innerring/processors/governance/handlers.go create mode 100644 pkg/innerring/processors/governance/process_update.go create mode 100644 pkg/innerring/processors/governance/processor.go diff --git a/pkg/innerring/processors/governance/events.go b/pkg/innerring/processors/governance/events.go new file mode 100644 index 000000000..a56fc47fa --- /dev/null +++ b/pkg/innerring/processors/governance/events.go @@ -0,0 +1,11 @@ +package governance + +// Sync is a event to start governance synchronization. +type Sync struct{} + +// MorphEvent implements Event interface. +func (s Sync) MorphEvent() {} + +func NewSyncEvent() Sync { + return Sync{} +} diff --git a/pkg/innerring/processors/governance/handlers.go b/pkg/innerring/processors/governance/handlers.go new file mode 100644 index 000000000..1b41f0c9a --- /dev/null +++ b/pkg/innerring/processors/governance/handlers.go @@ -0,0 +1,19 @@ +package governance + +import ( + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "go.uber.org/zap" +) + +func (gp *Processor) HandleAlphabetSync(_ event.Event) { + gp.log.Info("new event", zap.String("type", "sync")) + + // send event to the worker pool + + err := gp.pool.Submit(func() { gp.processAlphabetSync() }) + if err != nil { + // there system can be moved into controlled degradation stage + gp.log.Warn("governance worker pool drained", + zap.Int("capacity", gp.pool.Cap())) + } +} diff --git a/pkg/innerring/processors/governance/process_update.go b/pkg/innerring/processors/governance/process_update.go new file mode 100644 index 000000000..e86522303 --- /dev/null +++ b/pkg/innerring/processors/governance/process_update.go @@ -0,0 +1,98 @@ +package governance + +import ( + "encoding/binary" + "sort" + + "github.com/nspcc-dev/neofs-node/pkg/innerring/invoke" + "go.uber.org/zap" +) + +const alphabetUpdateIDPrefix = "AlphabetUpdate" + +func (gp *Processor) processAlphabetSync() { + if !gp.alphabetState.IsAlphabet() { + gp.log.Info("non alphabet mode, ignore alphabet sync") + return + } + + mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() + if err != nil { + gp.log.Error("can't fetch alphabet list from main net", + zap.String("error", err.Error())) + return + } + + sidechainAlphabet, err := gp.morphClient.Committee() + if err != nil { + gp.log.Error("can't fetch alphabet list from side chain", + zap.String("error", err.Error())) + return + } + + newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) + if err != nil { + gp.log.Error("can't merge alphabet lists from main net and side chain", + zap.String("error", err.Error())) + return + } + + if newAlphabet == nil { + gp.log.Info("no governance update, alphabet list has not been changed") + return + } + + gp.log.Info("alphabet list has been changed, starting update") + sort.Sort(newAlphabet) + + // 1. Vote to side chain committee via alphabet contracts. + err = gp.voter.VoteForSidechainValidator(newAlphabet) + if err != nil { + gp.log.Error("can't vote for side chain committee", + zap.String("error", err.Error())) + } + + // 2. Update NeoFSAlphabet role in side chain. + innerRing, err := gp.morphClient.NeoFSAlphabetList() + if err != nil { + gp.log.Error("can't fetch inner ring list from side chain", + zap.String("error", err.Error())) + } else { + newInnerRing, err := updateInnerRing(innerRing, sidechainAlphabet, newAlphabet) + if err != nil { + gp.log.Error("can't create new inner ring list with new alphabet keys", + zap.String("error", err.Error())) + } else { + sort.Sort(newInnerRing) + + err = gp.morphClient.UpdateNeoFSAlphabetList(newInnerRing) + if err != nil { + gp.log.Error("can't update inner ring list with new alphabet keys", + zap.String("error", err.Error())) + } + } + } + + // 3. Update notary role in side chain. + err = gp.morphClient.UpdateNotaryList(newAlphabet) + if err != nil { + gp.log.Error("can't update list of notary nodes in side chain", + zap.String("error", err.Error())) + } + + // 4. Update NeoFS contract in main net. + epoch := gp.epochState.EpochCounter() + + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, epoch) + + id := append([]byte(alphabetUpdateIDPrefix), buf...) + + err = invoke.AlphabetUpdate(gp.mainnetClient, gp.neofsContract, id, newAlphabet) + if err != nil { + gp.log.Error("can't update list of alphabet nodes in neofs contract", + zap.String("error", err.Error())) + } + + gp.log.Info("finished alphabet list update") +} diff --git a/pkg/innerring/processors/governance/processor.go b/pkg/innerring/processors/governance/processor.go new file mode 100644 index 000000000..47fe0bdb0 --- /dev/null +++ b/pkg/innerring/processors/governance/processor.go @@ -0,0 +1,109 @@ +package governance + +import ( + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + "github.com/nspcc-dev/neofs-node/pkg/morph/event" + "github.com/panjf2000/ants/v2" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// GovernanceProcessor manages governance sync tasks. This process must not be +// interrupted by other sync operation, so we limit pool size for processor to +// one. +const ProcessorPoolSize = 1 + +type ( + // AlphabetState is a callback interface for inner ring global state. + AlphabetState interface { + IsAlphabet() bool + } + + // Voter is a callback interface for alphabet contract voting. + Voter interface { + VoteForSidechainValidator(keys keys.PublicKeys) error + } + + // EpochState is a callback interface for inner ring global state. + EpochState interface { + EpochCounter() uint64 + } + + // Processor of events related to governance in the network. + Processor struct { + log *zap.Logger + pool *ants.Pool + neofsContract util.Uint160 + + alphabetState AlphabetState + epochState EpochState + voter Voter + + mainnetClient *client.Client + morphClient *client.Client + } + + // Params of the processor constructor. + Params struct { + Log *zap.Logger + NeoFSContract util.Uint160 + + AlphabetState AlphabetState + EpochState EpochState + Voter Voter + + MorphClient *client.Client + MainnetClient *client.Client + } +) + +// New creates balance contract processor instance. +func New(p *Params) (*Processor, error) { + switch { + case p.Log == nil: + return nil, errors.New("ir/governance: logger is not set") + case p.MainnetClient == nil: + return nil, errors.New("ir/governance: neo:mainnet client is not set") + case p.MorphClient == nil: + return nil, errors.New("ir/governance: neo:sidechain client is not set") + case p.AlphabetState == nil: + return nil, errors.New("ir/governance: global state is not set") + case p.EpochState == nil: + return nil, errors.New("ir/governance: global state is not set") + case p.Voter == nil: + return nil, errors.New("ir/governance: global state is not set") + } + + pool, err := ants.NewPool(ProcessorPoolSize, ants.WithNonblocking(true)) + if err != nil { + return nil, errors.Wrap(err, "ir/governance: can't create worker pool") + } + + return &Processor{ + log: p.Log, + pool: pool, + neofsContract: p.NeoFSContract, + alphabetState: p.AlphabetState, + epochState: p.EpochState, + voter: p.Voter, + mainnetClient: p.MainnetClient, + morphClient: p.MorphClient, + }, nil +} + +// ListenerParsers for the 'event.Listener' event producer. +func (gp *Processor) ListenerParsers() []event.ParserInfo { + return nil +} + +// ListenerHandlers for the 'event.Listener' event producer. +func (gp *Processor) ListenerHandlers() []event.HandlerInfo { + return nil +} + +// TimersHandlers for the 'Timers' event producer. +func (gp *Processor) TimersHandlers() []event.HandlerInfo { + return nil +}