core: keep Designation cache always valid and up-to-date

Always use cache instead of DAO where possible. Update cache in-place
each time new designated node is chosen.
This commit is contained in:
Anna Shaleva 2022-04-19 14:35:19 +03:00
parent c8bdd2ad1a
commit 11ab42d91c
4 changed files with 201 additions and 47 deletions

View file

@ -594,7 +594,10 @@ func (bc *Blockchain) initializeNativeCache(d *dao.Simple) error {
if err != nil {
return fmt.Errorf("can't init cache for Management native contract: %w", err)
}
bc.contracts.Designate.InitializeCache(d)
err = bc.contracts.Designate.InitializeCache(d)
if err != nil {
return fmt.Errorf("can't init cache for Designation native contract: %w", err)
}
bc.contracts.Oracle.InitializeCache(d)
if bc.P2PSigExtensionsEnabled() {
err = bc.contracts.Notary.InitializeCache(d)

View file

@ -3,6 +3,7 @@ package native
import (
"encoding/binary"
"errors"
"fmt"
"math"
"math/big"
"sort"
@ -48,6 +49,8 @@ type roleData struct {
}
type DesignationCache struct {
// rolesChangedFlag shows whether any of designated nodes were changed within the current block.
// It is used to notify dependant services about updated node roles during PostPersist.
rolesChangedFlag bool
oracles roleData
stateVals roleData
@ -120,14 +123,33 @@ func newDesignate(p2pSigExtensionsEnabled bool) *Designate {
return s
}
// Initialize initializes Oracle contract.
// Initialize initializes Designation contract. It is called once at native Management's OnPersist
// at the genesis block, and we can't properly fill the cache at this point, as there are no roles
// data in the storage.
func (s *Designate) Initialize(ic *interop.Context) error {
cache := &DesignationCache{}
cache.rolesChangedFlag = true
ic.DAO.Store.SetCache(s.ID, cache)
return nil
}
// InitializeCache fills native Designate cache from DAO. It is called at non-zero height, thus
// we can fetch the roles data right from the storage.
func (s *Designate) InitializeCache(d *dao.Simple) error {
cache := &DesignationCache{}
roles := []noderoles.Role{noderoles.Oracle, noderoles.NeoFSAlphabet, noderoles.StateValidator}
if s.p2pSigExtensionsEnabled {
roles = append(roles, noderoles.P2PNotary)
}
for _, r := range roles {
err := s.updateCachedRoleData(cache, d, r)
if err != nil {
return fmt.Errorf("failed to get nodes from storage for %d role: %w", r, err)
}
}
d.Store.SetCache(s.ID, cache)
return nil
}
// OnPersist implements Contract interface.
func (s *Designate) OnPersist(ic *interop.Context) error {
return nil
@ -136,23 +158,15 @@ func (s *Designate) OnPersist(ic *interop.Context) error {
// PostPersist implements Contract interface.
func (s *Designate) PostPersist(ic *interop.Context) error {
cache := ic.DAO.Store.GetRWCache(s.ID).(*DesignationCache)
if !rolesChanged(cache) {
if !cache.rolesChangedFlag {
return nil
}
if err := s.updateCachedRoleData(&cache.oracles, ic.DAO, noderoles.Oracle); err != nil {
return err
}
if err := s.updateCachedRoleData(&cache.stateVals, ic.DAO, noderoles.StateValidator); err != nil {
return err
}
if err := s.updateCachedRoleData(&cache.neofsAlphabet, ic.DAO, noderoles.NeoFSAlphabet); err != nil {
return err
}
s.notifyRoleChanged(&cache.oracles, noderoles.Oracle)
s.notifyRoleChanged(&cache.stateVals, noderoles.StateValidator)
s.notifyRoleChanged(&cache.neofsAlphabet, noderoles.NeoFSAlphabet)
if s.p2pSigExtensionsEnabled {
if err := s.updateCachedRoleData(&cache.notaries, ic.DAO, noderoles.P2PNotary); err != nil {
return err
}
s.notifyRoleChanged(&cache.notaries, noderoles.P2PNotary)
}
cache.rolesChangedFlag = false
@ -184,10 +198,6 @@ func (s *Designate) getDesignatedByRole(ic *interop.Context, args []stackitem.It
return pubsToArray(pubs)
}
func rolesChanged(cache *DesignationCache) bool {
return cache.rolesChangedFlag
}
func (s *Designate) hashFromNodes(r noderoles.Role, nodes keys.PublicKeys) util.Uint160 {
if len(nodes) == 0 {
return util.Uint160{}
@ -202,29 +212,46 @@ func (s *Designate) hashFromNodes(r noderoles.Role, nodes keys.PublicKeys) util.
return hash.Hash160(script)
}
func (s *Designate) updateCachedRoleData(v *roleData, d *dao.Simple, r noderoles.Role) error {
nodeKeys, height, err := s.GetDesignatedByRole(d, r, math.MaxUint32)
// updateCachedRoleData fetches the most recent role data from the storage and
// updates the given cache.
func (s *Designate) updateCachedRoleData(cache *DesignationCache, d *dao.Simple, r noderoles.Role) error {
var v *roleData
switch r {
case noderoles.Oracle:
v = &cache.oracles
case noderoles.StateValidator:
v = &cache.stateVals
case noderoles.NeoFSAlphabet:
v = &cache.neofsAlphabet
case noderoles.P2PNotary:
v = &cache.notaries
}
nodeKeys, height, err := s.getDesignatedByRoleFromStorage(d, r, math.MaxUint32)
if err != nil {
return err
}
v.nodes = nodeKeys
v.addr = s.hashFromNodes(r, nodeKeys)
v.height = height
cache.rolesChangedFlag = true
return nil
}
func (s *Designate) notifyRoleChanged(v *roleData, r noderoles.Role) {
switch r {
case noderoles.Oracle:
if orc, _ := s.OracleService.Load().(services.Oracle); orc != nil {
orc.UpdateOracleNodes(nodeKeys.Copy())
orc.UpdateOracleNodes(v.nodes.Copy())
}
case noderoles.P2PNotary:
if ntr, _ := s.NotaryService.Load().(services.Notary); ntr != nil {
ntr.UpdateNotaryNodes(nodeKeys.Copy())
ntr.UpdateNotaryNodes(v.nodes.Copy())
}
case noderoles.StateValidator:
if s.StateRootService != nil {
s.StateRootService.UpdateStateValidators(height, nodeKeys.Copy())
s.StateRootService.UpdateStateValidators(v.height, v.nodes.Copy())
}
}
return nil
}
func getCachedRoleData(cache *DesignationCache, r noderoles.Role) *roleData {
@ -247,17 +274,10 @@ func (s *Designate) GetLastDesignatedHash(d *dao.Simple, r noderoles.Role) (util
return util.Uint160{}, ErrInvalidRole
}
cache := d.Store.GetROCache(s.ID).(*DesignationCache)
if !rolesChanged(cache) {
if val := getCachedRoleData(cache, r); val != nil {
return val.addr, nil
}
if val := getCachedRoleData(cache, r); val != nil {
return val.addr, nil
}
nodes, _, err := s.GetDesignatedByRole(d, r, math.MaxUint32)
if err != nil {
return util.Uint160{}, err
}
// We only have hashing defined for oracles now.
return s.hashFromNodes(r, nodes), nil
return util.Uint160{}, nil
}
// GetDesignatedByRole returns nodes for role r.
@ -266,11 +286,21 @@ func (s *Designate) GetDesignatedByRole(d *dao.Simple, r noderoles.Role, index u
return nil, 0, ErrInvalidRole
}
cache := d.Store.GetROCache(s.ID).(*DesignationCache)
if !rolesChanged(cache) {
if val := getCachedRoleData(cache, r); val != nil && val.height <= index {
if val := getCachedRoleData(cache, r); val != nil {
if val.height <= index {
return val.nodes.Copy(), val.height, nil
}
} else {
// Cache is always valid, thus if there's no cache then there's no designated nodes for this role.
return nil, 0, nil
}
// Cache stores only latest designated nodes, so if the old info is requested, then we still need
// to search in the storage.
return s.getDesignatedByRoleFromStorage(d, r, index)
}
// getDesignatedByRoleFromStorage returns nodes for role r from the storage.
func (s *Designate) getDesignatedByRoleFromStorage(d *dao.Simple, r noderoles.Role, index uint32) (keys.PublicKeys, uint32, error) {
var (
ns NodeList
bestIndex uint32
@ -344,12 +374,18 @@ func (s *Designate) DesignateAsRole(ic *interop.Context, r noderoles.Role, pubs
}
sort.Sort(pubs)
nl := NodeList(pubs)
ic.DAO.Store.GetRWCache(s.ID).(*DesignationCache).rolesChangedFlag = true
err := putConvertibleToDAO(s.ID, ic.DAO, key, &nl)
if err != nil {
return err
}
cache := ic.DAO.Store.GetRWCache(s.ID).(*DesignationCache)
err = s.updateCachedRoleData(cache, ic.DAO, r)
if err != nil {
return fmt.Errorf("failed to update Designation role data cache: %w", err)
}
ic.Notifications = append(ic.Notifications, state.NotificationEvent{
ScriptHash: s.Hash,
Name: DesignationEventName,
@ -372,10 +408,3 @@ func (s *Designate) getRole(item stackitem.Item) (noderoles.Role, bool) {
u := bi.Uint64()
return noderoles.Role(u), u <= math.MaxUint8 && s.isValidRole(noderoles.Role(u))
}
// InitializeCache invalidates native Designate cache.
func (s *Designate) InitializeCache(d *dao.Simple) {
cache := &DesignationCache{}
cache.rolesChangedFlag = true
d.Store.SetCache(s.ID, cache)
}

View file

@ -8,9 +8,12 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/neotest/chain"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/stretchr/testify/require"
)
@ -77,6 +80,33 @@ func testGetSet(t *testing.T, c *neotest.ContractInvoker, name string, defaultVa
})
}
func testGetSetCache(t *testing.T, c *neotest.ContractInvoker, name string, defaultValue int64) {
getName := "get" + name
setName := "set" + name
committeeInvoker := c.WithSigners(c.Committee)
newVal := defaultValue - 1
// Change fee, abort the transaction and check that contract cache wasn't persisted
// for FAULTed tx at the same block.
w := io.NewBufBinWriter()
emit.AppCall(w.BinWriter, committeeInvoker.Hash, setName, callflag.All, newVal)
emit.Opcodes(w.BinWriter, opcode.ABORT)
tx1 := committeeInvoker.PrepareInvocation(t, w.Bytes(), committeeInvoker.Signers)
tx2 := committeeInvoker.PrepareInvoke(t, getName)
committeeInvoker.AddNewBlock(t, tx1, tx2)
committeeInvoker.CheckFault(t, tx1.Hash(), "ABORT")
committeeInvoker.CheckHalt(t, tx2.Hash(), stackitem.Make(defaultValue))
// Change fee and check that change is available for the next tx.
tx1 = committeeInvoker.PrepareInvoke(t, setName, newVal)
tx2 = committeeInvoker.PrepareInvoke(t, getName)
committeeInvoker.AddNewBlock(t, tx1, tx2)
committeeInvoker.CheckHalt(t, tx1.Hash())
committeeInvoker.CheckHalt(t, tx2.Hash(), stackitem.Make(newVal))
}
func setNodesByRole(t *testing.T, designateInvoker *neotest.ContractInvoker, ok bool, r noderoles.Role, nodes keys.PublicKeys) {
pubs := make([]interface{}, len(nodes))
for i := range nodes {

View file

@ -5,8 +5,15 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/native/nativenames"
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/neotest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/nspcc-dev/neo-go/pkg/vm/opcode"
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
"github.com/stretchr/testify/require"
)
@ -45,3 +52,88 @@ func TestDesignate_DesignateAsRole(t *testing.T) {
checkNodeRoles(t, designateInvoker, true, noderoles.NeoFSAlphabet, e.Chain.BlockHeight()+1, pubs)
})
}
type dummyOracle struct {
updateNodes func(k keys.PublicKeys)
}
// AddRequests processes new requests.
func (o *dummyOracle) AddRequests(map[uint64]*state.OracleRequest) {
}
// RemoveRequests removes already processed requests.
func (o *dummyOracle) RemoveRequests([]uint64) {
panic("TODO")
}
// UpdateOracleNodes updates oracle nodes.
func (o *dummyOracle) UpdateOracleNodes(k keys.PublicKeys) {
if o.updateNodes != nil {
o.updateNodes(k)
return
}
panic("TODO")
}
// UpdateNativeContract updates oracle contract native script and hash.
func (o *dummyOracle) UpdateNativeContract([]byte, []byte, util.Uint160, int) {
}
// Start runs oracle module.
func (o *dummyOracle) Start() {
panic("TODO")
}
// Shutdown shutdowns oracle module.
func (o *dummyOracle) Shutdown() {
panic("TODO")
}
func TestDesignate_Cache(t *testing.T) {
c := newDesignateClient(t)
e := c.Executor
designateInvoker := c.WithSigners(c.Committee)
r := int64(noderoles.Oracle)
var (
updatedNodes keys.PublicKeys
updateCalled bool
)
oracleServ := &dummyOracle{
updateNodes: func(k keys.PublicKeys) {
updatedNodes = k
updateCalled = true
},
}
privGood, err := keys.NewPrivateKey()
require.NoError(t, err)
pubsGood := []interface{}{privGood.PublicKey().Bytes()}
privBad, err := keys.NewPrivateKey()
require.NoError(t, err)
pubsBad := []interface{}{privBad.PublicKey().Bytes()}
// Firstly, designate good Oracle node and check that OracleService callback was called during PostPersist.
e.Chain.SetOracle(oracleServ)
txDesignateGood := designateInvoker.PrepareInvoke(t, "designateAsRole", r, pubsGood)
e.AddNewBlock(t, txDesignateGood)
e.CheckHalt(t, txDesignateGood.Hash(), stackitem.Null{})
require.True(t, updateCalled)
require.Equal(t, keys.PublicKeys{privGood.PublicKey()}, updatedNodes)
updatedNodes = nil
updateCalled = false
// Check designated node in a separate block.
checkNodeRoles(t, designateInvoker, true, noderoles.Oracle, e.Chain.BlockHeight()+1, keys.PublicKeys{privGood.PublicKey()})
// Designate privBad as oracle node and abort the transaction. Designation cache changes
// shouldn't be persisted to the contract and no notification should be sent.
w := io.NewBufBinWriter()
emit.AppCall(w.BinWriter, designateInvoker.Hash, "designateAsRole", callflag.All, int64(r), pubsBad)
emit.Opcodes(w.BinWriter, opcode.ABORT)
require.NoError(t, w.Err)
script := w.Bytes()
designateInvoker.InvokeScriptCheckFAULT(t, script, designateInvoker.Signers, "ABORT")
require.Nil(t, updatedNodes)
require.False(t, updateCalled)
}