Blockchain graceful shutdown (#139)

* Blockchain graceful shutdown

- fix #138
- stop blockchain goroutine
- close leveldb database

* fix possible context leak (go vet)
This commit is contained in:
Evgeniy Kulikov 2019-02-19 14:48:48 +03:00 committed by fabwa
parent c0a5c100ca
commit 9c24bf9139
5 changed files with 53 additions and 19 deletions

View file

@ -1,6 +1,7 @@
package server package server
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
@ -30,6 +31,17 @@ func NewCommand() cli.Command {
} }
} }
func newGraceContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
go func() {
<-stop
cancel()
}()
return ctx
}
func startServer(ctx *cli.Context) error { func startServer(ctx *cli.Context) error {
net := config.ModePrivNet net := config.ModePrivNet
if ctx.Bool("testnet") { if ctx.Bool("testnet") {
@ -39,6 +51,9 @@ func startServer(ctx *cli.Context) error {
net = config.ModeMainNet net = config.ModeMainNet
} }
grace, cancel := context.WithCancel(newGraceContext())
defer cancel()
configPath := "../config" configPath := "../config"
if argCp := ctx.String("config-path"); argCp != "" { if argCp := ctx.String("config-path"); argCp != "" {
configPath = argCp configPath = argCp
@ -48,11 +63,8 @@ func startServer(ctx *cli.Context) error {
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
} }
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt)
serverConfig := network.NewServerConfig(cfg) serverConfig := network.NewServerConfig(cfg)
chain, err := core.NewBlockchainLevelDB(cfg) chain, err := core.NewBlockchainLevelDB(grace, cfg)
if err != nil { if err != nil {
err = fmt.Errorf("could not initialize blockchain: %s", err) err = fmt.Errorf("could not initialize blockchain: %s", err)
return cli.NewExitError(err, 1) return cli.NewExitError(err, 1)
@ -79,9 +91,9 @@ Main:
select { select {
case err := <-errChan: case err := <-errChan:
shutdownErr = errors.Wrap(err, "Error encountered by server") shutdownErr = errors.Wrap(err, "Error encountered by server")
interruptChan <- os.Kill cancel()
case <-interruptChan: case <-grace.Done():
server.Shutdown() server.Shutdown()
if serverErr := rpcServer.Shutdown(); serverErr != nil { if serverErr := rpcServer.Shutdown(); serverErr != nil {
shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server") shutdownErr = errors.Wrap(serverErr, "Error encountered whilst shutting down server")

View file

@ -2,6 +2,7 @@ package core
import ( import (
"bytes" "bytes"
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
@ -60,7 +61,7 @@ type headersOpFunc func(headerList *HeaderHashList)
// NewBlockchain return a new blockchain object the will use the // NewBlockchain return a new blockchain object the will use the
// given Store as its underlying storage. // given Store as its underlying storage.
func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) { func NewBlockchain(ctx context.Context, s storage.Store, cfg config.ProtocolConfiguration) (*Blockchain, error) {
bc := &Blockchain{ bc := &Blockchain{
config: cfg, config: cfg,
Store: s, Store: s,
@ -69,7 +70,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
blockCache: NewCache(), blockCache: NewCache(),
verifyBlocks: false, verifyBlocks: false,
} }
go bc.run() go bc.run(ctx)
if err := bc.init(); err != nil { if err := bc.init(); err != nil {
return nil, err return nil, err
@ -79,8 +80,9 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration) (*Blockcha
} }
// GetBlockchainLevelDB returns blockchain based on configuration // GetBlockchainLevelDB returns blockchain based on configuration
func NewBlockchainLevelDB(cfg config.Config) (*Blockchain, error) { func NewBlockchainLevelDB(ctx context.Context, cfg config.Config) (*Blockchain, error) {
store, err := storage.NewLevelDBStore( store, err := storage.NewLevelDBStore(
ctx,
cfg.ApplicationConfiguration.DataDirectoryPath, cfg.ApplicationConfiguration.DataDirectoryPath,
nil, nil,
) )
@ -88,7 +90,7 @@ func NewBlockchainLevelDB(cfg config.Config) (*Blockchain, error) {
return nil, err return nil, err
} }
return NewBlockchain(store, cfg.ProtocolConfiguration) return NewBlockchain(ctx, store, cfg.ProtocolConfiguration)
} }
func (bc *Blockchain) init() error { func (bc *Blockchain) init() error {
@ -165,15 +167,18 @@ func (bc *Blockchain) init() error {
return nil return nil
} }
func (bc *Blockchain) run() { func (bc *Blockchain) run(ctx context.Context) {
persistTimer := time.NewTimer(persistInterval) persistTimer := time.NewTimer(persistInterval)
defer persistTimer.Stop()
for { for {
select { select {
case <-ctx.Done():
return
case op := <-bc.headersOp: case op := <-bc.headersOp:
op(bc.headerList) op(bc.headerList)
bc.headersOpDone <- struct{}{} bc.headersOpDone <- struct{}{}
case <-persistTimer.C: case <-persistTimer.C:
go bc.persist() go bc.persist(ctx)
persistTimer.Reset(persistInterval) persistTimer.Reset(persistInterval)
} }
} }
@ -395,7 +400,7 @@ func (bc *Blockchain) persistBlock(block *Block) error {
return nil return nil
} }
func (bc *Blockchain) persist() (err error) { func (bc *Blockchain) persist(ctx context.Context) (err error) {
var ( var (
start = time.Now() start = time.Now()
persisted = 0 persisted = 0
@ -422,7 +427,13 @@ func (bc *Blockchain) persist() (err error) {
} }
} }
} }
<-bc.headersOpDone
select {
case <-ctx.Done():
return
case <-bc.headersOpDone:
//
}
if persisted > 0 { if persisted > 0 {
log.WithFields(log.Fields{ log.WithFields(log.Fields{

View file

@ -1,6 +1,7 @@
package core package core
import ( import (
"context"
"testing" "testing"
"github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/config"
@ -54,7 +55,7 @@ func TestAddBlock(t *testing.T) {
t.Log(bc.blockCache) t.Log(bc.blockCache)
if err := bc.persist(); err != nil { if err := bc.persist(context.Background()); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -118,7 +119,7 @@ func TestHasBlock(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
assert.Nil(t, bc.persist()) assert.Nil(t, bc.persist(context.Background()))
for i := 0; i < len(blocks); i++ { for i := 0; i < len(blocks); i++ {
assert.True(t, bc.HasBlock(blocks[i].Hash())) assert.True(t, bc.HasBlock(blocks[i].Hash()))
@ -148,7 +149,7 @@ func newTestChain(t *testing.T) *Blockchain {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
chain, err := NewBlockchain(storage.NewMemoryStore(), cfg.ProtocolConfiguration) chain, err := NewBlockchain(context.Background(), storage.NewMemoryStore(), cfg.ProtocolConfiguration)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -1,6 +1,8 @@
package storage package storage
import ( import (
"context"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
@ -15,11 +17,18 @@ type LevelDBStore struct {
// NewLevelDBStore return a new LevelDBStore object that will // NewLevelDBStore return a new LevelDBStore object that will
// initialize the database found at the given path. // initialize the database found at the given path.
func NewLevelDBStore(path string, opts *opt.Options) (*LevelDBStore, error) { func NewLevelDBStore(ctx context.Context, path string, opts *opt.Options) (*LevelDBStore, error) {
db, err := leveldb.OpenFile(path, opts) db, err := leveldb.OpenFile(path, opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// graceful shutdown
go func() {
<-ctx.Done()
db.Close()
}()
return &LevelDBStore{ return &LevelDBStore{
path: path, path: path,
db: db, db: db,

View file

@ -2,6 +2,7 @@ package rpc
import ( import (
"bytes" "bytes"
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -25,7 +26,7 @@ func TestHandler(t *testing.T) {
t.Fatal("could not create levelDB chain", err) t.Fatal("could not create levelDB chain", err)
} }
chain, err := core.NewBlockchainLevelDB(cfg) chain, err := core.NewBlockchainLevelDB(context.Background(), cfg)
if err != nil { if err != nil {
t.Fatal("could not create levelDB chain", err) t.Fatal("could not create levelDB chain", err)
} }