From b05754deac0fa4ee058da5d0c327dfe216ca4dd5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 7 Nov 2019 20:47:48 +0300 Subject: [PATCH] core: add Close() to blockchainer, implement it to properly close chain Before it the deferred function in Run() was actually never able to properly close the Store, so we weren't synching the latest state to the disk. --- cli/server/server.go | 15 ++++++--------- pkg/core/blockchain.go | 19 ++++++++++++++++--- pkg/core/blockchain_test.go | 20 ++++++++++++++++++++ pkg/core/blockchainer.go | 1 + pkg/core/helper_test.go | 3 +-- pkg/network/helper_test.go | 3 +++ pkg/rpc/server_helper_test.go | 5 ++--- pkg/rpc/server_test.go | 7 +------ 8 files changed, 50 insertions(+), 23 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 141e3220a..8eb9a7cf2 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -154,14 +154,11 @@ func dumpDB(ctx *cli.Context) error { defer outStream.Close() writer := io.NewBinWriterFromIO(outStream) - grace, cancel := context.WithCancel(newGraceContext()) - defer cancel() - chain, err := initBlockChain(cfg) if err != nil { return cli.NewExitError(err, 1) } - go chain.Run(grace) + go chain.Run() chainHeight := chain.BlockHeight() if skip+count > chainHeight { @@ -182,6 +179,7 @@ func dumpDB(ctx *cli.Context) error { return cli.NewExitError(err, 1) } } + chain.Close() return nil } func restoreDB(ctx *cli.Context) error { @@ -204,14 +202,11 @@ func restoreDB(ctx *cli.Context) error { defer inStream.Close() reader := io.NewBinReaderFromIO(inStream) - grace, cancel := context.WithCancel(newGraceContext()) - defer cancel() - chain, err := initBlockChain(cfg) if err != nil { return err } - go chain.Run(grace) + go chain.Run() var allBlocks uint32 reader.ReadLE(&allBlocks) @@ -243,6 +238,7 @@ func restoreDB(ctx *cli.Context) error { return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1) } } + chain.Close() return nil } @@ -272,7 +268,7 @@ func startServer(ctx *cli.Context) error { errChan := make(chan error) monitoring := metrics.NewMetricsService(cfg.ApplicationConfiguration.Monitoring) - go chain.Run(grace) + go chain.Run() go server.Start(errChan) go rpcServer.Start(errChan) go monitoring.Start() @@ -295,6 +291,7 @@ Main: shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server") } monitoring.ShutDown() + chain.Close() break Main } } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8b2ecfa75..9d04377fa 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "context" "fmt" "math" "sort" @@ -65,6 +64,10 @@ type Blockchain struct { headersOp chan headersOpFunc headersOpDone chan struct{} + // Stop synchronization mechanisms. + stopCh chan struct{} + runToExitCh chan struct{} + memPool MemPool } @@ -78,6 +81,8 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha store: storage.NewMemCachedStore(s), headersOp: make(chan headersOpFunc), headersOpDone: make(chan struct{}), + stopCh: make(chan struct{}), + runToExitCh: make(chan struct{}), memPool: NewMemPool(50000), } @@ -178,7 +183,7 @@ func (bc *Blockchain) init() error { } // Run runs chain loop. -func (bc *Blockchain) Run(ctx context.Context) { +func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() @@ -188,10 +193,11 @@ func (bc *Blockchain) Run(ctx context.Context) { if err := bc.store.Close(); err != nil { log.Warnf("failed to close db: %s", err) } + close(bc.runToExitCh) }() for { select { - case <-ctx.Done(): + case <-bc.stopCh: return case op := <-bc.headersOp: op(bc.headerList) @@ -208,6 +214,13 @@ func (bc *Blockchain) Run(ctx context.Context) { } } +// Close stops Blockchain's internal loop, syncs changes to persistent storage +// and closes it. The Blockchain is no longer functional after the call to Close. +func (bc *Blockchain) Close() { + close(bc.stopCh) + <-bc.runToExitCh +} + // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *Block) error { diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index f99114893..82503256b 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -155,3 +155,23 @@ func TestGetTransaction(t *testing.T) { assert.NoError(t, bc.persist()) } } + +func TestClose(t *testing.T) { + defer func() { + r := recover() + assert.NotNil(t, r) + }() + bc := newTestChain(t) + blocks := makeBlocks(10) + for i := 0; i < len(blocks); i++ { + require.NoError(t, bc.AddBlock(blocks[i])) + } + bc.Close() + // It's a hack, but we use internal knowledge of MemoryStore + // implementation which makes it completely unusable (up to panicing) + // after Close(). + _ = bc.store.Put([]byte{0}, []byte{1}) + + // This should never be executed. + assert.Nil(t, t) +} diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index 1ce3247e6..c13fd1ae3 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -15,6 +15,7 @@ type Blockchainer interface { AddHeaders(...*Header) error AddBlock(*Block) error BlockHeight() uint32 + Close() HeaderHeight() uint32 GetBlock(hash util.Uint256) (*Block, error) GetContractState(hash util.Uint160) *ContractState diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index 112057f32..500914660 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -1,7 +1,6 @@ package core import ( - "context" "encoding/hex" "encoding/json" "fmt" @@ -41,7 +40,7 @@ func newTestChain(t *testing.T) *Blockchain { if err != nil { t.Fatal(err) } - go chain.Run(context.Background()) + go chain.Run() zeroHash, err := chain.GetHeader(chain.GetHeaderHash(0)) require.Nil(t, err) newBlockPrevHash = zeroHash.Hash() diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 41530a452..4d64656e8 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -52,6 +52,9 @@ func (chain *testChain) AddBlock(block *core.Block) error { func (chain *testChain) BlockHeight() uint32 { return atomic.LoadUint32(&chain.blockheight) } +func (chain *testChain) Close() { + panic("TODO") +} func (chain testChain) HeaderHeight() uint32 { return 0 } diff --git a/pkg/rpc/server_helper_test.go b/pkg/rpc/server_helper_test.go index 42d02d544..a2de55187 100644 --- a/pkg/rpc/server_helper_test.go +++ b/pkg/rpc/server_helper_test.go @@ -1,7 +1,6 @@ package rpc import ( - "context" "net/http" "os" "testing" @@ -139,7 +138,7 @@ type GetAccountStateResponse struct { ID int `json:"id"` } -func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Blockchain, http.HandlerFunc) { +func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFunc) { var nBlocks uint32 net := config.ModeUnitTestNet @@ -151,7 +150,7 @@ func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Block chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) require.NoError(t, err, "could not create chain") - go chain.Run(ctx) + go chain.Run() f, err := os.Open("testdata/50testblocks.acc") require.Nil(t, err) diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 533dd6f04..05d7eacf5 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -2,7 +2,6 @@ package rpc import ( "bytes" - "context" "encoding/json" "fmt" "io/ioutil" @@ -10,16 +9,12 @@ import ( "net/http/httptest" "strings" "testing" - "time" "github.com/stretchr/testify/assert" ) func TestRPC(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - chain, handler := initServerWithInMemoryChain(ctx, t) + chain, handler := initServerWithInMemoryChain(t) t.Run("getbestblockhash", func(t *testing.T) { rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}`