diff --git a/cli/server/server.go b/cli/server/server.go index 141e3220a..8eb9a7cf2 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -154,14 +154,11 @@ func dumpDB(ctx *cli.Context) error { defer outStream.Close() writer := io.NewBinWriterFromIO(outStream) - grace, cancel := context.WithCancel(newGraceContext()) - defer cancel() - chain, err := initBlockChain(cfg) if err != nil { return cli.NewExitError(err, 1) } - go chain.Run(grace) + go chain.Run() chainHeight := chain.BlockHeight() if skip+count > chainHeight { @@ -182,6 +179,7 @@ func dumpDB(ctx *cli.Context) error { return cli.NewExitError(err, 1) } } + chain.Close() return nil } func restoreDB(ctx *cli.Context) error { @@ -204,14 +202,11 @@ func restoreDB(ctx *cli.Context) error { defer inStream.Close() reader := io.NewBinReaderFromIO(inStream) - grace, cancel := context.WithCancel(newGraceContext()) - defer cancel() - chain, err := initBlockChain(cfg) if err != nil { return err } - go chain.Run(grace) + go chain.Run() var allBlocks uint32 reader.ReadLE(&allBlocks) @@ -243,6 +238,7 @@ func restoreDB(ctx *cli.Context) error { return cli.NewExitError(fmt.Errorf("failed to add block %d: %s", i, err), 1) } } + chain.Close() return nil } @@ -272,7 +268,7 @@ func startServer(ctx *cli.Context) error { errChan := make(chan error) monitoring := metrics.NewMetricsService(cfg.ApplicationConfiguration.Monitoring) - go chain.Run(grace) + go chain.Run() go server.Start(errChan) go rpcServer.Start(errChan) go monitoring.Start() @@ -295,6 +291,7 @@ Main: shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server") } monitoring.ShutDown() + chain.Close() break Main } } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 8b2ecfa75..9d04377fa 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -2,7 +2,6 @@ package core import ( "bytes" - "context" "fmt" "math" "sort" @@ -65,6 +64,10 @@ type Blockchain struct { headersOp chan headersOpFunc headersOpDone chan struct{} + // Stop synchronization mechanisms. + stopCh chan struct{} + runToExitCh chan struct{} + memPool MemPool } @@ -78,6 +81,8 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha store: storage.NewMemCachedStore(s), headersOp: make(chan headersOpFunc), headersOpDone: make(chan struct{}), + stopCh: make(chan struct{}), + runToExitCh: make(chan struct{}), memPool: NewMemPool(50000), } @@ -178,7 +183,7 @@ func (bc *Blockchain) init() error { } // Run runs chain loop. -func (bc *Blockchain) Run(ctx context.Context) { +func (bc *Blockchain) Run() { persistTimer := time.NewTimer(persistInterval) defer func() { persistTimer.Stop() @@ -188,10 +193,11 @@ func (bc *Blockchain) Run(ctx context.Context) { if err := bc.store.Close(); err != nil { log.Warnf("failed to close db: %s", err) } + close(bc.runToExitCh) }() for { select { - case <-ctx.Done(): + case <-bc.stopCh: return case op := <-bc.headersOp: op(bc.headerList) @@ -208,6 +214,13 @@ func (bc *Blockchain) Run(ctx context.Context) { } } +// Close stops Blockchain's internal loop, syncs changes to persistent storage +// and closes it. The Blockchain is no longer functional after the call to Close. +func (bc *Blockchain) Close() { + close(bc.stopCh) + <-bc.runToExitCh +} + // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *Block) error { diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index f99114893..82503256b 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -155,3 +155,23 @@ func TestGetTransaction(t *testing.T) { assert.NoError(t, bc.persist()) } } + +func TestClose(t *testing.T) { + defer func() { + r := recover() + assert.NotNil(t, r) + }() + bc := newTestChain(t) + blocks := makeBlocks(10) + for i := 0; i < len(blocks); i++ { + require.NoError(t, bc.AddBlock(blocks[i])) + } + bc.Close() + // It's a hack, but we use internal knowledge of MemoryStore + // implementation which makes it completely unusable (up to panicing) + // after Close(). + _ = bc.store.Put([]byte{0}, []byte{1}) + + // This should never be executed. + assert.Nil(t, t) +} diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index 1ce3247e6..c13fd1ae3 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -15,6 +15,7 @@ type Blockchainer interface { AddHeaders(...*Header) error AddBlock(*Block) error BlockHeight() uint32 + Close() HeaderHeight() uint32 GetBlock(hash util.Uint256) (*Block, error) GetContractState(hash util.Uint160) *ContractState diff --git a/pkg/core/helper_test.go b/pkg/core/helper_test.go index 112057f32..500914660 100644 --- a/pkg/core/helper_test.go +++ b/pkg/core/helper_test.go @@ -1,7 +1,6 @@ package core import ( - "context" "encoding/hex" "encoding/json" "fmt" @@ -41,7 +40,7 @@ func newTestChain(t *testing.T) *Blockchain { if err != nil { t.Fatal(err) } - go chain.Run(context.Background()) + go chain.Run() zeroHash, err := chain.GetHeader(chain.GetHeaderHash(0)) require.Nil(t, err) newBlockPrevHash = zeroHash.Hash() diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 41530a452..4d64656e8 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -52,6 +52,9 @@ func (chain *testChain) AddBlock(block *core.Block) error { func (chain *testChain) BlockHeight() uint32 { return atomic.LoadUint32(&chain.blockheight) } +func (chain *testChain) Close() { + panic("TODO") +} func (chain testChain) HeaderHeight() uint32 { return 0 } diff --git a/pkg/rpc/server_helper_test.go b/pkg/rpc/server_helper_test.go index 42d02d544..a2de55187 100644 --- a/pkg/rpc/server_helper_test.go +++ b/pkg/rpc/server_helper_test.go @@ -1,7 +1,6 @@ package rpc import ( - "context" "net/http" "os" "testing" @@ -139,7 +138,7 @@ type GetAccountStateResponse struct { ID int `json:"id"` } -func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Blockchain, http.HandlerFunc) { +func initServerWithInMemoryChain(t *testing.T) (*core.Blockchain, http.HandlerFunc) { var nBlocks uint32 net := config.ModeUnitTestNet @@ -151,7 +150,7 @@ func initServerWithInMemoryChain(ctx context.Context, t *testing.T) (*core.Block chain, err := core.NewBlockchain(memoryStore, cfg.ProtocolConfiguration) require.NoError(t, err, "could not create chain") - go chain.Run(ctx) + go chain.Run() f, err := os.Open("testdata/50testblocks.acc") require.Nil(t, err) diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 533dd6f04..05d7eacf5 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -2,7 +2,6 @@ package rpc import ( "bytes" - "context" "encoding/json" "fmt" "io/ioutil" @@ -10,16 +9,12 @@ import ( "net/http/httptest" "strings" "testing" - "time" "github.com/stretchr/testify/assert" ) func TestRPC(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - chain, handler := initServerWithInMemoryChain(ctx, t) + chain, handler := initServerWithInMemoryChain(t) t.Run("getbestblockhash", func(t *testing.T) { rpc := `{"jsonrpc": "2.0", "id": 1, "method": "getbestblockhash", "params": []}`