forked from TrueCloudLab/frostfs-node
[#446] innerring: Use indexer to get relevant inner ring size and index
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
97fe50d452
commit
80bfd08a47
4 changed files with 110 additions and 20 deletions
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/misc"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -108,5 +109,7 @@ func defaultConfiguration(cfg *viper.Viper) {
|
|||
|
||||
cfg.SetDefault("settlement.basic_income_rate", 0)
|
||||
|
||||
cfg.SetDefault("indexer.cache_timeout", 15*time.Second)
|
||||
|
||||
cfg.SetDefault("locode.db.path", "")
|
||||
}
|
||||
|
|
80
pkg/innerring/indexer.go
Normal file
80
pkg/innerring/indexer.go
Normal file
|
@ -0,0 +1,80 @@
|
|||
package innerring
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/innerring/invoke"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type innerRingIndexer struct {
|
||||
sync.RWMutex
|
||||
|
||||
cli *client.Client
|
||||
key *ecdsa.PublicKey
|
||||
timeout time.Duration
|
||||
|
||||
innerRingIndex, innerRingSize int32
|
||||
|
||||
lastAccess time.Time
|
||||
}
|
||||
|
||||
func newInnerRingIndexer(cli *client.Client, key *ecdsa.PublicKey, to time.Duration) *innerRingIndexer {
|
||||
return &innerRingIndexer{
|
||||
cli: cli,
|
||||
key: key,
|
||||
timeout: to,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *innerRingIndexer) update() (err error) {
|
||||
s.RLock()
|
||||
|
||||
if time.Since(s.lastAccess) < s.timeout {
|
||||
s.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
s.RUnlock()
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if time.Since(s.lastAccess) < s.timeout {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.innerRingIndex, s.innerRingSize, err = invoke.InnerRingIndex(s.cli, s.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.lastAccess = time.Now()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *innerRingIndexer) InnerRingIndex() (int32, error) {
|
||||
if err := s.update(); err != nil {
|
||||
return 0, errors.Wrap(err, "can't update index state")
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
return s.innerRingIndex, nil
|
||||
}
|
||||
|
||||
func (s *innerRingIndexer) InnerRingSize() (int32, error) {
|
||||
if err := s.update(); err != nil {
|
||||
return 0, errors.Wrap(err, "can't update index state")
|
||||
}
|
||||
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
return s.innerRingSize, nil
|
||||
}
|
|
@ -48,13 +48,12 @@ type (
|
|||
epochTimer *timers.BlockTimer
|
||||
|
||||
// global state
|
||||
morphClient *client.Client
|
||||
mainnetClient *client.Client
|
||||
epochCounter atomic.Uint64
|
||||
innerRingIndex atomic.Int32
|
||||
innerRingSize atomic.Int32
|
||||
precision precision.Fixed8Converter
|
||||
auditClient *auditWrapper.ClientWrapper
|
||||
morphClient *client.Client
|
||||
mainnetClient *client.Client
|
||||
epochCounter atomic.Uint64
|
||||
statusIndex *innerRingIndexer
|
||||
precision precision.Fixed8Converter
|
||||
auditClient *auditWrapper.ClientWrapper
|
||||
|
||||
notaryDepositAmount fixedn.Fixed8
|
||||
notaryDuration uint32
|
||||
|
@ -297,6 +296,12 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
|
|||
|
||||
server.pubKey = crypto.MarshalPublicKey(&server.key.PublicKey)
|
||||
|
||||
server.statusIndex = newInnerRingIndexer(
|
||||
server.morphClient,
|
||||
&server.key.PublicKey,
|
||||
cfg.GetDuration("indexer.cache_timeout"),
|
||||
)
|
||||
|
||||
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -711,14 +716,6 @@ func (s *Server) initConfigFromBlockchain() error {
|
|||
return errors.Wrap(err, "can't read epoch")
|
||||
}
|
||||
|
||||
key := &s.key.PublicKey
|
||||
|
||||
// check if node inside inner ring list and what index it has
|
||||
index, size, err := invoke.InnerRingIndex(s.mainnetClient, key)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't read inner ring list")
|
||||
}
|
||||
|
||||
// get balance precision
|
||||
balancePrecision, err := invoke.BalancePrecision(s.morphClient, s.contracts.balance)
|
||||
if err != nil {
|
||||
|
@ -726,8 +723,6 @@ func (s *Server) initConfigFromBlockchain() error {
|
|||
}
|
||||
|
||||
s.epochCounter.Store(uint64(epoch))
|
||||
s.innerRingSize.Store(size)
|
||||
s.innerRingIndex.Store(index)
|
||||
s.precision.SetBalancePrecision(balancePrecision)
|
||||
|
||||
s.log.Debug("read config from blockchain",
|
||||
|
|
|
@ -21,19 +21,31 @@ func (s *Server) SetEpochCounter(val uint64) {
|
|||
|
||||
// IsActive is a getter for a global active flag state.
|
||||
func (s *Server) IsActive() bool {
|
||||
return s.innerRingIndex.Load() >= 0
|
||||
return s.Index() >= 0
|
||||
}
|
||||
|
||||
// Index is a getter for a global index of node in inner ring list. Negative
|
||||
// index means that node is not in the inner ring list.
|
||||
func (s *Server) Index() int {
|
||||
return int(s.innerRingIndex.Load())
|
||||
index, err := s.statusIndex.InnerRingIndex()
|
||||
if err != nil {
|
||||
s.log.Error("can't get inner ring index", zap.String("error", err.Error()))
|
||||
return -1
|
||||
}
|
||||
|
||||
return int(index)
|
||||
}
|
||||
|
||||
// InnerRingSize is a getter for a global size of inner ring list. This value
|
||||
// paired with inner ring index.
|
||||
func (s *Server) InnerRingSize() int {
|
||||
return int(s.innerRingSize.Load())
|
||||
size, err := s.statusIndex.InnerRingSize()
|
||||
if err != nil {
|
||||
s.log.Error("can't get inner ring size", zap.String("error", err.Error()))
|
||||
return 0
|
||||
}
|
||||
|
||||
return int(size)
|
||||
}
|
||||
|
||||
func (s *Server) voteForSidechainValidator(validators []keys.PublicKey) error {
|
||||
|
|
Loading…
Reference in a new issue