core: add Close() to blockchainer, implement it to properly close chain

Before it the deferred function in Run() was actually never able to properly
close the Store, so we weren't synching the latest state to the disk.
This commit is contained in:
Roman Khimov 2019-11-07 20:47:48 +03:00
parent d33083e1e1
commit b05754deac
8 changed files with 50 additions and 23 deletions

View file

@ -154,14 +154,11 @@ func dumpDB(ctx *cli.Context) error {
defer outStream.Close() defer outStream.Close()
writer := io.NewBinWriterFromIO(outStream) writer := io.NewBinWriterFromIO(outStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg) chain, err := initBlockChain(cfg)
if err != nil { if err != nil {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
go chain.Run(grace) go chain.Run()
chainHeight := chain.BlockHeight() chainHeight := chain.BlockHeight()
if skip+count > chainHeight { if skip+count > chainHeight {
@ -182,6 +179,7 @@ func dumpDB(ctx *cli.Context) error {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
} }
chain.Close()
return nil return nil
} }
func restoreDB(ctx *cli.Context) error { func restoreDB(ctx *cli.Context) error {
@ -204,14 +202,11 @@ func restoreDB(ctx *cli.Context) error {
defer inStream.Close() defer inStream.Close()
reader := io.NewBinReaderFromIO(inStream) reader := io.NewBinReaderFromIO(inStream)
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
chain, err := initBlockChain(cfg) chain, err := initBlockChain(cfg)
if err != nil { if err != nil {
return err return err
} }
go chain.Run(grace) go chain.Run()
var allBlocks uint32 var allBlocks uint32
reader.ReadLE(&allBlocks) reader.ReadLE(&allBlocks)
@ -243,6 +238,7 @@ func restoreDB(ctx *cli.Context) error {
return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1) return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1)
} }
} }
chain.Close()
return nil return nil
} }
@ -272,7 +268,7 @@ func startServer(ctx *cli.Context) error {
errChan := make(chan error) errChan := make(chan error)
monitoring := metrics.NewMetricsService(cfg.ApplicationConfiguration.Monitoring) monitoring := metrics.NewMetricsService(cfg.ApplicationConfiguration.Monitoring)
go chain.Run(grace) go chain.Run()
go server.Start(errChan) go server.Start(errChan)
go rpcServer.Start(errChan) go rpcServer.Start(errChan)
go monitoring.Start() go monitoring.Start()
@ -295,6 +291,7 @@ Main:
shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server") shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server")
} }
monitoring.ShutDown() monitoring.ShutDown()
chain.Close()
break Main break Main
} }
} }

View file

@ -2,7 +2,6 @@ package core
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"math" "math"
"sort" "sort"
@ -65,6 +64,10 @@ type Blockchain struct {
headersOp chan headersOpFunc headersOp chan headersOpFunc
headersOpDone chan struct{} headersOpDone chan struct{}
// Stop synchronization mechanisms.
stopCh chan struct{}
runToExitCh chan struct{}
memPool MemPool memPool MemPool
} }
@ -78,6 +81,8 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
store: storage.NewMemCachedStore(s), store: storage.NewMemCachedStore(s),
headersOp: make(chan headersOpFunc), headersOp: make(chan headersOpFunc),
headersOpDone: make(chan struct{}), headersOpDone: make(chan struct{}),
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: NewMemPool(50000), memPool: NewMemPool(50000),
} }
@ -178,7 +183,7 @@ func (bc *Blockchain) init() error {
} }
// Run runs chain loop. // Run runs chain loop.
func (bc *Blockchain) Run(ctx context.Context) { func (bc *Blockchain) Run() {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer func() { defer func() {
persistTimer.Stop() persistTimer.Stop()
@ -188,10 +193,11 @@ func (bc *Blockchain) Run(ctx context.Context) {
if err := bc.store.Close(); err != nil { if err := bc.store.Close(); err != nil {
log.Warnf("failed to close db: %s", err) log.Warnf("failed to close db: %s", err)
} }
close(bc.runToExitCh)
}() }()
for { for {
select { select {
case <-ctx.Done(): case <-bc.stopCh:
return return
case op := <-bc.headersOp: case op := <-bc.headersOp:
op(bc.headerList) op(bc.headerList)
@ -208,6 +214,13 @@ func (bc *Blockchain) Run(ctx context.Context) {
} }
} }
// Close stops Blockchain's internal loop, syncs changes to persistent storage
// and closes it. The Blockchain is no longer functional after the call to Close.
func (bc *Blockchain) Close() {
close(bc.stopCh)
<-bc.runToExitCh
}
// AddBlock accepts successive block for the Blockchain, verifies it and // AddBlock accepts successive block for the Blockchain, verifies it and
// stores internally. Eventually it will be persisted to the backing storage. // stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *Block) error { func (bc *Blockchain) AddBlock(block *Block) error {

View file

@ -155,3 +155,23 @@ func TestGetTransaction(t *testing.T) {
assert.NoError(t, bc.persist()) assert.NoError(t, bc.persist())
} }
} }
func TestClose(t *testing.T) {
defer func() {
r := recover()
assert.NotNil(t, r)
}()
bc := newTestChain(t)
blocks := makeBlocks(10)
for i := 0; i < len(blocks); i++ {
require.NoError(t, bc.AddBlock(blocks[i]))
}
bc.Close()
// It's a hack, but we use internal knowledge of MemoryStore
// implementation which makes it completely unusable (up to panicing)
// after Close().
_ = bc.store.Put([]byte{0}, []byte{1})
// This should never be executed.
assert.Nil(t, t)
}

View file

@ -15,6 +15,7 @@ type Blockchainer interface {
AddHeaders(...*Header) error AddHeaders(...*Header) error
AddBlock(*Block) error AddBlock(*Block) error
BlockHeight() uint32 BlockHeight() uint32
Close()
HeaderHeight() uint32 HeaderHeight() uint32
GetBlock(hash util.Uint256) (*Block, error) GetBlock(hash util.Uint256) (*Block, error)
GetContractState(hash util.Uint160) *ContractState GetContractState(hash util.Uint160) *ContractState

View file

@ -1,7 +1,6 @@
package core package core
import ( import (
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -41,7 +40,7 @@ func newTestChain(t *testing.T) *Blockchain {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
go chain.Run(context.Background()) go chain.Run()
zeroHash, err := chain.GetHeader(chain.GetHeaderHash(0)) zeroHash, err := chain.GetHeader(chain.GetHeaderHash(0))
require.Nil(t, err) require.Nil(t, err)
newBlockPrevHash = zeroHash.Hash() newBlockPrevHash = zeroHash.Hash()

View file

@ -52,6 +52,9 @@ func (chain *testChain) AddBlock(block *core.Block) error {
func (chain *testChain) BlockHeight() uint32 { func (chain *testChain) BlockHeight() uint32 {
return atomic.LoadUint32(&chain.blockheight) return atomic.LoadUint32(&chain.blockheight)
} }
func (chain *testChain) Close() {
panic("TODO")
}
func (chain testChain) HeaderHeight() uint32 { func (chain testChain) HeaderHeight() uint32 {
return 0 return 0
} }

View file

@ -1,7 +1,6 @@
package rpc package rpc
import ( import (
"context"
"net/http" "net/http"
"os" "os"
"testing" "testing"
@ -139,7 +138,7 @@ type GetAccountStateResponse struct {
ID int `json:"id"` ID int `json:"id"`
} }
func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Blockchain, http.HandlerFunc) { func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFunc) {
var nBlocks uint32 var nBlocks uint32
net := config.ModeUnitTestNet net := config.ModeUnitTestNet
@ -151,7 +150,7 @@ func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Block
chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration)
require.NoError(t, err, "could not create chain") require.NoError(t, err, "could not create chain")
go chain.Run(ctx) go chain.Run()
f, err := os.Open("testdata/50testblocks.acc") f, err := os.Open("testdata/50testblocks.acc")
require.Nil(t, err) require.Nil(t, err)

View file

@ -2,7 +2,6 @@ package rpc
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -10,16 +9,12 @@ import (
"net/http/httptest" "net/http/httptest"
"strings" "strings"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestRPC(t *testing.T) { func TestRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) chain, handler := initServerWithInMemoryChain(t)
defer cancel()
chain, handler := initServerWithInMemoryChain(ctx, t)
t.Run("getbestblockhash", func(t *testing.T) { t.Run("getbestblockhash", func(t *testing.T) {
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}` rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}`