Merge pull request #485 from nspcc-dev/close-blockchain

core: add Close() to blockchainer, implement it to properly close chain
This commit is contained in:
Roman Khimov 2019-11-08 12:24:42 +03:00 committed by GitHub
commit 4d770e3c37
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 50 additions and 23 deletions

View file

@ -154,14 +154,11 @@ func dumpDB(ctx *cli.Context) error {
defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg)
if err != nil {
return cli.NewExitError(err, 1)
}
go chain.Run(grace)
go chain.Run()
chainHeight := chain.BlockHeight()
if skip+count > chainHeight {
@ -182,6 +179,7 @@ func dumpDB(ctx *cli.Context) error {
return cli.NewExitError(err, 1)
}
}
chain.Close()
return nil
}
func restoreDB(ctx *cli.Context) error {
@ -204,14 +202,11 @@ func restoreDB(ctx *cli.Context) error {
defer inStream.Close()
reader := io.NewBinReaderFromIO(inStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg)
if err != nil {
return err
}
go chain.Run(grace)
go chain.Run()
var allBlocks uint32
reader.ReadLE(&allBlocks)
@ -243,6 +238,7 @@ func restoreDB(ctx *cli.Context) error {
return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1)
}
}
chain.Close()
return nil
}
@ -272,7 +268,7 @@ func startServer(ctx *cli.Context) error {
errChan := make(chan error)
monitoring := metrics.NewMetricsService(cfg.ApplicationConfiguration.Monitoring)
go chain.Run(grace)
go chain.Run()
go server.Start(errChan)
go rpcServer.Start(errChan)
go monitoring.Start()
@ -295,6 +291,7 @@ Main:
shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server")
}
monitoring.ShutDown()
chain.Close()
break Main
}
}

View file

@ -2,7 +2,6 @@ package core
import (
"bytes"
"context"
"fmt"
"math"
"sort"
@ -65,6 +64,10 @@ type Blockchain struct {
headersOp chan headersOpFunc
headersOpDone chan struct{}
// Stop synchronization mechanisms.
stopCh chan struct{}
runToExitCh chan struct{}
memPool MemPool
}
@ -78,6 +81,8 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
store: storage.NewMemCachedStore(s),
headersOp: make(chan headersOpFunc),
headersOpDone: make(chan struct{}),
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: NewMemPool(50000),
}
@ -178,7 +183,7 @@ func (bc *Blockchain) init() error {
}
// Run runs chain loop.
func (bc *Blockchain) Run(ctx context.Context) {
func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval)
defer func() {
persistTimer.Stop()
@ -188,10 +193,11 @@ func (bc *Blockchain) Run(ctx context.Context) {
if err := bc.store.Close(); err != nil {
log.Warnf("failed to close db: %s", err)
}
close(bc.runToExitCh)
}()
for {
select {
case <-ctx.Done():
case <-bc.stopCh:
return
case op := <-bc.headersOp:
op(bc.headerList)
@ -208,6 +214,13 @@ func (bc *Blockchain) Run(ctx context.Context) {
}
}
// Close stops Blockchain's internal loop, syncs changes to persistent storage
// and closes it. The Blockchain is no longer functional after the call to Close.
func (bc *Blockchain) Close() {
close(bc.stopCh)
<-bc.runToExitCh
}
// AddBlock accepts successive block for the Blockchain, verifies it and
// stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *Block) error {

View file

@ -155,3 +155,23 @@ func TestGetTransaction(t *testing.T) {
assert.NoError(t, bc.persist())
}
}
func TestClose(t *testing.T) {
defer func() {
r := recover()
assert.NotNil(t, r)
}()
bc := newTestChain(t)
blocks := makeBlocks(10)
for i := 0; i < len(blocks); i++ {
require.NoError(t, bc.AddBlock(blocks[i]))
}
bc.Close()
// It's a hack, but we use internal knowledge of MemoryStore
// implementation which makes it completely unusable (up to panicing)
// after Close().
_ = bc.store.Put([]byte{0}, []byte{1})
// This should never be executed.
assert.Nil(t, t)
}

View file

@ -15,6 +15,7 @@ type Blockchainer interface {
AddHeaders(...*Header) error
AddBlock(*Block) error
BlockHeight() uint32
Close()
HeaderHeight() uint32
GetBlock(hash util.Uint256) (*Block, error)
GetContractState(hash util.Uint160) *ContractState

View file

@ -1,7 +1,6 @@
package core
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -41,7 +40,7 @@ func newTestChain(t *testing.T) *Blockchain {
if err != nil {
t.Fatal(err)
}
go chain.Run(context.Background())
go chain.Run()
zeroHash, err := chain.GetHeader(chain.GetHeaderHash(0))
require.Nil(t, err)
newBlockPrevHash = zeroHash.Hash()

View file

@ -52,6 +52,9 @@ func (chain *testChain) AddBlock(block *core.Block) error {
func (chain *testChain) BlockHeight() uint32 {
return atomic.LoadUint32(&chain.blockheight)
}
func (chain *testChain) Close() {
panic("TODO")
}
func (chain testChain) HeaderHeight() uint32 {
return 0
}

View file

@ -1,7 +1,6 @@
package rpc
import (
"context"
"net/http"
"os"
"testing"
@ -139,7 +138,7 @@ type GetAccountStateResponse struct {
ID int `json:"id"`
}
func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Blockchain, http.HandlerFunc) {
func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFunc) {
var nBlocks uint32
net := config.ModeUnitTestNet
@ -151,7 +150,7 @@ func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Block
chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration)
require.NoError(t, err, "could not create chain")
go chain.Run(ctx)
go chain.Run()
f, err := os.Open("testdata/50testblocks.acc")
require.Nil(t, err)

View file

@ -2,7 +2,6 @@ package rpc
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@ -10,16 +9,12 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
chain, handler := initServerWithInMemoryChain(ctx, t)
chain, handler := initServerWithInMemoryChain(t)
t.Run("getbestblockhash", func(t *testing.T) {
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}`