Merge pull request #2034 from nspcc-dev/fix-state

Fix MPT branch node strip after deletion
This commit is contained in:
Roman Khimov 2021-07-06 14:05:18 +03:00 committed by GitHub
commit ae8f4ebd5e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 16 deletions

View file

@ -38,8 +38,10 @@ func (b *Batch) Add(key []byte, value []byte) {
// PutBatch puts batch to trie.
// It is not atomic (and probably cannot be without substantial slow-down)
// and returns number of elements processed.
// However each element is being put atomically, so Trie is always in a valid state.
// It is used mostly after the block processing to update MPT and error is not expected.
// If an error is returned, the trie may be in the inconsistent state in case of storage failures.
// This is due to the fact that we can remove multiple children from the branch node simultaneously
// and won't strip the resulting branch node.
// However it is used mostly after the block processing to update MPT and error is not expected.
func (t *Trie) PutBatch(b Batch) (int, error) {
r, n, err := t.putBatch(b.kv)
t.root = r
@ -74,23 +76,31 @@ func (t *Trie) putBatchIntoBranch(curr *BranchNode, kv []keyValue) (Node, int, e
return t.addToBranch(curr, kv, true)
}
func (t *Trie) mergeExtension(prefix []byte, sub Node) Node {
func (t *Trie) mergeExtension(prefix []byte, sub Node) (Node, error) {
switch sn := sub.(type) {
case *ExtensionNode:
t.removeRef(sn.Hash(), sn.bytes)
sn.key = append(prefix, sn.key...)
sn.invalidateCache()
t.addRef(sn.Hash(), sn.bytes)
return sn
return sn, nil
case *HashNode:
return sn
if sn.IsEmpty() {
return sn, nil
}
n, err := t.getFromStore(sn.Hash())
if err != nil {
return sn, err
}
return t.mergeExtension(prefix, n)
default:
if len(prefix) != 0 {
e := NewExtensionNode(prefix, sub)
t.addRef(e.Hash(), e.bytes)
return e
return e, nil
}
return sub
return sub, nil
}
}
@ -103,13 +113,19 @@ func (t *Trie) putBatchIntoExtension(curr *ExtensionNode, kv []keyValue) (Node,
// Extension must be split into new nodes.
stripPrefix(len(curr.key), kv)
sub, n, err := t.putBatchIntoNode(curr.next, kv)
return t.mergeExtension(pref, sub), n, err
if err == nil {
sub, err = t.mergeExtension(pref, sub)
}
return sub, n, err
}
if len(pref) != 0 {
stripPrefix(len(pref), kv)
sub, n, err := t.putBatchIntoExtensionNoPrefix(curr.key[len(pref):], curr.next, kv)
return t.mergeExtension(pref, sub), n, err
if err == nil {
sub, err = t.mergeExtension(pref, sub)
}
return sub, n, err
}
return t.putBatchIntoExtensionNoPrefix(curr.key, curr.next, kv)
}
@ -134,6 +150,15 @@ func (t *Trie) addToBranch(b *BranchNode, kv []keyValue, inTrie bool) (Node, int
if inTrie {
t.removeRef(b.Hash(), b.bytes)
}
// Error during iterate means some storage failure (i.e. some hash node cannot be
// retrieved from storage). This can leave trie in inconsistent state, because
// it can be impossible to strip branch node after it has been changed.
// Consider a branch with 10 children, first 9 of which are deleted and the remaining one
// is a leaf node replaced by a hash node missing from storage.
// This can't be fixed easily because we need to _revert_ changes in reference counts
// for children which were updated successfully. But storage access errors means we are
// in a bad state anyway.
n, err := t.iterateBatch(kv, func(c byte, kv []keyValue) (int, error) {
child, n, err := t.putBatchIntoNode(b.Children[c], kv)
b.Children[c] = child
@ -142,12 +167,19 @@ func (t *Trie) addToBranch(b *BranchNode, kv []keyValue, inTrie bool) (Node, int
if inTrie && n != 0 {
b.invalidateCache()
}
return t.stripBranch(b), n, err
// Even if some of the children can't be put, we need to try to strip branch
// and possibly update refcounts.
nd, bErr := t.stripBranch(b)
if err == nil {
err = bErr
}
return nd, n, err
}
// stripsBranch strips branch node after incomplete batch put.
// It assumes there is no reference to b in trie.
func (t *Trie) stripBranch(b *BranchNode) Node {
func (t *Trie) stripBranch(b *BranchNode) (Node, error) {
var n int
var lastIndex byte
for i := range b.Children {
@ -158,12 +190,12 @@ func (t *Trie) stripBranch(b *BranchNode) Node {
}
switch {
case n == 0:
return new(HashNode)
return new(HashNode), nil
case n == 1:
return t.mergeExtension([]byte{lastIndex}, b.Children[lastIndex])
default:
t.addRef(b.Hash(), b.bytes)
return b
return b, nil
}
}
@ -227,7 +259,10 @@ func (t *Trie) newSubTrieMany(prefix []byte, kv []keyValue, value []byte) (Node,
b.Children[lastChild] = leaf
}
nd, n, err := t.addToBranch(b, kv, false)
return t.mergeExtension(prefix, nd), n, err
if err == nil {
nd, err = t.mergeExtension(prefix, nd)
}
return nd, n, err
}
func stripPrefix(n int, kv []keyValue) {

View file

@ -162,6 +162,18 @@ func TestTrie_PutBatchBranch(t *testing.T) {
tr1, tr2 := prepareBranch(t)
var ps = pairs{{[]byte{0x00, 2}, nil}}
testPut(t, ps, tr1, tr2)
t.Run("non-empty child is hash node", func(t *testing.T) {
tr1, tr2 := prepareBranch(t)
tr1.Flush()
tr1.Collapse(1)
tr2.Flush()
tr2.Collapse(1)
var ps = pairs{{[]byte{0x00, 2}, nil}}
testPut(t, ps, tr1, tr2)
require.IsType(t, (*ExtensionNode)(nil), tr1.root)
})
})
t.Run("incomplete put, transform to extension", func(t *testing.T) {
tr1, tr2 := prepareBranch(t)

View file

@ -195,6 +195,9 @@ func (c *nep17TokenNative) updateAccBalance(ic *interop.Context, acc util.Uint16
err := c.incBalance(ic, acc, &si, amount)
if err != nil {
if si != nil && amount.Sign() < 0 {
_ = ic.DAO.PutStorageItem(c.ID, key, si)
}
return err
}
if si == nil {