mirror of
https://github.com/nspcc-dev/neo-go.git
synced 2025-01-04 03:51:35 +00:00
eeeb0f6f0e
Blockchain's notificationDispatcher sends events to channels and these channels must be read from. Unfortunately, regular service shutdown procedure does unsubscription first (outside of the read loop) and only then drains the channel. While it waits for unsubscription request to be accepted notificationDispatcher can try pushing more data into the same channel which will lead to a deadlock. Reading in the same method solves this, any number of events can be pushed until unsub channel accepts the data.
173 lines
4.4 KiB
Go
173 lines
4.4 KiB
Go
package stateroot
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
|
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
|
"github.com/nspcc-dev/neo-go/pkg/core/stateroot"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type (
|
|
// Ledger is an interface to Blockchain sufficient for Service.
|
|
Ledger interface {
|
|
GetConfig() config.ProtocolConfiguration
|
|
HeaderHeight() uint32
|
|
SubscribeForBlocks(ch chan *block.Block)
|
|
UnsubscribeFromBlocks(ch chan *block.Block)
|
|
}
|
|
|
|
// Service represents a state root service.
|
|
Service interface {
|
|
Name() string
|
|
OnPayload(p *payload.Extensible) error
|
|
AddSignature(height uint32, validatorIndex int32, sig []byte) error
|
|
GetConfig() config.StateRoot
|
|
// Start runs service instance in a separate goroutine.
|
|
// The service only starts once, subsequent calls to Start are no-op.
|
|
Start()
|
|
// Shutdown stops the service. It can only be called once, subsequent calls
|
|
// to Shutdown on the same instance are no-op. The instance that was stopped can
|
|
// not be started again by calling Start (use a new instance if needed).
|
|
Shutdown()
|
|
}
|
|
|
|
service struct {
|
|
*stateroot.Module
|
|
chain Ledger
|
|
|
|
MainCfg config.StateRoot
|
|
Network netmode.Magic
|
|
|
|
log *zap.Logger
|
|
started *atomic.Bool
|
|
accMtx sync.RWMutex
|
|
accHeight uint32
|
|
myIndex byte
|
|
wallet *wallet.Wallet
|
|
acc *wallet.Account
|
|
|
|
srMtx sync.Mutex
|
|
incompleteRoots map[uint32]*incompleteRoot
|
|
|
|
timePerBlock time.Duration
|
|
maxRetries int
|
|
relayExtensible RelayCallback
|
|
blockCh chan *block.Block
|
|
stopCh chan struct{}
|
|
done chan struct{}
|
|
}
|
|
)
|
|
|
|
const (
|
|
// Category is a message category for extensible payloads.
|
|
Category = "StateService"
|
|
)
|
|
|
|
// New returns a new state root service instance using the underlying module.
|
|
func New(cfg config.StateRoot, sm *stateroot.Module, log *zap.Logger, bc Ledger, cb RelayCallback) (Service, error) {
|
|
bcConf := bc.GetConfig()
|
|
s := &service{
|
|
Module: sm,
|
|
Network: bcConf.Magic,
|
|
started: atomic.NewBool(false),
|
|
chain: bc,
|
|
log: log,
|
|
incompleteRoots: make(map[uint32]*incompleteRoot),
|
|
blockCh: make(chan *block.Block),
|
|
stopCh: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
timePerBlock: time.Duration(bcConf.SecondsPerBlock) * time.Second,
|
|
maxRetries: voteValidEndInc,
|
|
relayExtensible: cb,
|
|
}
|
|
|
|
s.MainCfg = cfg
|
|
if cfg.Enabled {
|
|
if bcConf.StateRootInHeader {
|
|
return nil, errors.New("`StateRootInHeader` should be disabled when state service is enabled")
|
|
}
|
|
var err error
|
|
w := cfg.UnlockWallet
|
|
if s.wallet, err = wallet.NewWalletFromFile(w.Path); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
haveAccount := false
|
|
for _, acc := range s.wallet.Accounts {
|
|
if err := acc.Decrypt(w.Password, s.wallet.Scrypt); err == nil {
|
|
haveAccount = true
|
|
break
|
|
}
|
|
}
|
|
if !haveAccount {
|
|
return nil, errors.New("no wallet account could be unlocked")
|
|
}
|
|
|
|
s.SetUpdateValidatorsCallback(s.updateValidators)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
// OnPayload implements Service interface.
|
|
func (s *service) OnPayload(ep *payload.Extensible) error {
|
|
m := &Message{}
|
|
r := io.NewBinReaderFromBuf(ep.Data)
|
|
m.DecodeBinary(r)
|
|
if r.Err != nil {
|
|
return r.Err
|
|
}
|
|
switch m.Type {
|
|
case RootT:
|
|
sr := m.Payload.(*state.MPTRoot)
|
|
if sr.Index == 0 {
|
|
return nil
|
|
}
|
|
err := s.AddStateRoot(sr)
|
|
if errors.Is(err, stateroot.ErrStateMismatch) {
|
|
s.log.Error("can't add SV-signed state root", zap.Error(err))
|
|
return nil
|
|
}
|
|
s.srMtx.Lock()
|
|
ir, ok := s.incompleteRoots[sr.Index]
|
|
s.srMtx.Unlock()
|
|
if ok {
|
|
ir.Lock()
|
|
ir.isSent = true
|
|
ir.Unlock()
|
|
}
|
|
return err
|
|
case VoteT:
|
|
v := m.Payload.(*Vote)
|
|
return s.AddSignature(v.Height, v.ValidatorIndex, v.Signature)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *service) updateValidators(height uint32, pubs keys.PublicKeys) {
|
|
s.accMtx.Lock()
|
|
defer s.accMtx.Unlock()
|
|
|
|
s.acc = nil
|
|
for i := range pubs {
|
|
if acc := s.wallet.GetAccount(pubs[i].GetScriptHash()); acc != nil {
|
|
err := acc.Decrypt(s.MainCfg.UnlockWallet.Password, s.wallet.Scrypt)
|
|
if err == nil {
|
|
s.acc = acc
|
|
s.accHeight = height
|
|
s.myIndex = byte(i)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|