From 264dfef370e6f082fc1b7c66faf1a9eb07f97b2f Mon Sep 17 00:00:00 2001 From: Vsevolod Brekelov Date: Mon, 16 Sep 2019 18:52:47 +0300 Subject: [PATCH] storage: close function add close function to storage interface add common defer function call which will close db connection remove context as soon as it's not needed anymore updated unit tests --- cli/server/server.go | 2 +- pkg/core/blockchain.go | 7 ++++++- pkg/core/storage/boltdb_store.go | 9 +-------- pkg/core/storage/boltdb_store_test.go | 3 +-- pkg/core/storage/leveldb_store.go | 15 ++++++--------- pkg/core/storage/memory_store.go | 8 ++++++++ pkg/core/storage/memory_store_test.go | 4 ++++ pkg/core/storage/redis_store.go | 5 +++++ pkg/core/storage/redis_store_test.go | 5 +++++ pkg/core/storage/store.go | 8 ++++---- pkg/rpc/server_test.go | 3 +-- 11 files changed, 42 insertions(+), 27 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index ef5095d0b..6c4e48f37 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -112,7 +112,7 @@ Main: // initBlockChain initializes BlockChain with preselected DB. func initBlockChain(context context.Context, cfg config.Config) (*core.Blockchain, error) { - store, err := storage.NewStore(context, cfg.ApplicationConfiguration.DBConfiguration) + store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration) if err != nil { return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1) } diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index f37e8441e..0b5f65c5a 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -154,7 +154,12 @@ func (bc *Blockchain) init() error { func (bc *Blockchain) run(ctx context.Context) { persistTimer := time.NewTimer(persistInterval) - defer persistTimer.Stop() + defer func() { + persistTimer.Stop() + if err := bc.Store.Close(); err != nil { + log.Warnf("failed to close db: %s", err) + } + }() for { select { case <-ctx.Done(): diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index b0c18c7c7..e63ec95e0 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -2,7 +2,6 @@ package storage import ( "bytes" - "context" "fmt" "os" "path" @@ -41,7 +40,7 @@ func (b *BoltDBBatch) Put(k, v []byte) { } // NewBoltDBStore returns a new ready to use BoltDB storage with created bucket. -func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error) { +func NewBoltDBStore(cfg BoltDBOptions) (*BoltDBStore, error) { var opts *bbolt.Options // should be exposed via BoltDBOptions if anything needed fileMode := os.FileMode(0600) // should be exposed via BoltDBOptions if anything needed fileName := cfg.FilePath @@ -62,12 +61,6 @@ func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error return nil }) - // graceful shutdown - go func() { - <-ctx.Done() - db.Close() - }() - return &BoltDBStore{db: db}, nil } diff --git a/pkg/core/storage/boltdb_store_test.go b/pkg/core/storage/boltdb_store_test.go index 73dc4097d..51128b749 100644 --- a/pkg/core/storage/boltdb_store_test.go +++ b/pkg/core/storage/boltdb_store_test.go @@ -1,7 +1,6 @@ package storage import ( - "context" "io/ioutil" "os" "reflect" @@ -83,7 +82,7 @@ func openStore(t *testing.T) *BoltDBStore { }() require.NoError(t, err) require.NoError(t, file.Close()) - boltDBStore, err := NewBoltDBStore(context.Background(), BoltDBOptions{FilePath: testFileName}) + boltDBStore, err := NewBoltDBStore(BoltDBOptions{FilePath: testFileName}) require.NoError(t, err) return boltDBStore } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 80352ccf2..1bc6e013b 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -1,8 +1,6 @@ package storage import ( - "context" - "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/util" @@ -22,7 +20,7 @@ type LevelDBStore struct { // NewLevelDBStore return a new LevelDBStore object that will // initialize the database found at the given path. -func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, error) { +func NewLevelDBStore(cfg LevelDBOptions) (*LevelDBStore, error) { var opts *opt.Options = nil // should be exposed via LevelDBOptions if anything needed db, err := leveldb.OpenFile(cfg.DataDirectoryPath, opts) @@ -30,12 +28,6 @@ func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, er return nil, err } - // graceful shutdown - go func() { - <-ctx.Done() - db.Close() - }() - return &LevelDBStore{ path: cfg.DataDirectoryPath, db: db, @@ -72,3 +64,8 @@ func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) { func (s *LevelDBStore) Batch() Batch { return new(leveldb.Batch) } + +// Close implements the Store interface. +func (s *LevelDBStore) Close() error { + return s.db.Close() +} diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index e5fae863b..2e155b84c 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -76,6 +76,14 @@ func (s *MemoryStore) Batch() Batch { } } +// Close implements Store interface and clears up memory. +func (s *MemoryStore) Close() error { + s.Lock() + s.mem = nil + s.Unlock() + return nil +} + func makeKey(k []byte) string { return hex.EncodeToString(k) } diff --git a/pkg/core/storage/memory_store_test.go b/pkg/core/storage/memory_store_test.go index 8d1132468..6d44bb433 100644 --- a/pkg/core/storage/memory_store_test.go +++ b/pkg/core/storage/memory_store_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestGetPut(t *testing.T) { @@ -22,6 +23,7 @@ func TestGetPut(t *testing.T) { t.Fatal(err) } assert.Equal(t, value, newVal) + require.NoError(t, s.Close()) } func TestKeyNotExist(t *testing.T) { @@ -33,6 +35,7 @@ func TestKeyNotExist(t *testing.T) { _, err := s.Get(key) assert.NotNil(t, err) assert.Equal(t, err.Error(), "key not found") + require.NoError(t, s.Close()) } func TestPutBatch(t *testing.T) { @@ -54,4 +57,5 @@ func TestPutBatch(t *testing.T) { t.Fatal(err) } assert.Equal(t, value, newVal) + require.NoError(t, s.Close()) } diff --git a/pkg/core/storage/redis_store.go b/pkg/core/storage/redis_store.go index 77bd55161..a401ce08d 100644 --- a/pkg/core/storage/redis_store.go +++ b/pkg/core/storage/redis_store.go @@ -92,3 +92,8 @@ func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) { f([]byte(key), []byte(val)) } } + +// Close implements the Store interface. +func (s *RedisStore) Close() error { + return s.client.Close() +} diff --git a/pkg/core/storage/redis_store_test.go b/pkg/core/storage/redis_store_test.go index c7a35165d..12403b974 100644 --- a/pkg/core/storage/redis_store_test.go +++ b/pkg/core/storage/redis_store_test.go @@ -6,6 +6,7 @@ import ( "github.com/alicebob/miniredis" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestNewRedisBatch(t *testing.T) { @@ -26,6 +27,7 @@ func TestNewRedisStore(t *testing.T) { assert.Nil(t, err, "NewRedisStore Get error") assert.Equal(t, value, result) + require.NoError(t, redisStore.Close()) redisMock.Close() } @@ -123,6 +125,7 @@ func TestRedisStore_GetAndPut(t *testing.T) { redisMock.FlushDB() }) } + require.NoError(t, redisStore.Close()) redisMock.Close() } @@ -134,6 +137,7 @@ func TestRedisStore_PutBatch(t *testing.T) { result, err := redisStore.Get([]byte("foo1")) assert.Nil(t, err) assert.Equal(t, []byte("bar1"), result) + require.NoError(t, redisStore.Close()) mock.Close() } @@ -142,6 +146,7 @@ func TestRedisStore_Seek(t *testing.T) { redisStore.Seek([]byte("foo"), func(k, v []byte) { assert.Equal(t, []byte("bar"), v) }) + require.NoError(t, redisStore.Close()) mock.Close() } diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 1873339b9..92fdeab07 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -1,7 +1,6 @@ package storage import ( - "context" "encoding/binary" "errors" ) @@ -37,6 +36,7 @@ type ( Put(k, v []byte) error PutBatch(Batch) error Seek(k []byte, f func(k, v []byte)) + Close() error } // Batch represents an abstraction on top of batch operations. @@ -75,18 +75,18 @@ func AppendPrefixInt(k KeyPrefix, n int) []byte { } // NewStore creates storage with preselected in configuration database type. -func NewStore(context context.Context, cfg DBConfiguration) (Store, error) { +func NewStore(cfg DBConfiguration) (Store, error) { var store Store var err error switch cfg.Type { case "leveldb": - store, err = NewLevelDBStore(context, cfg.LevelDBOptions) + store, err = NewLevelDBStore(cfg.LevelDBOptions) case "inmemory": store = NewMemoryStore() case "redis": store, err = NewRedisStore(cfg.RedisDBOptions) case "boltdb": - store, err = NewBoltDBStore(context, cfg.BoltDBOptions) + store, err = NewBoltDBStore(cfg.BoltDBOptions) } return store, err } diff --git a/pkg/rpc/server_test.go b/pkg/rpc/server_test.go index 2ffb16f04..d891ffe30 100644 --- a/pkg/rpc/server_test.go +++ b/pkg/rpc/server_test.go @@ -246,8 +246,7 @@ func TestHandler(t *testing.T) { cfg, err := config.Load(configPath, net) require.NoError(t, err, "could not load config") - store, err := storage.NewLevelDBStore(context.Background(), - cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions) + store, err := storage.NewLevelDBStore(cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions) assert.Nil(t, err) chain, err := core.NewBlockchain(context.Background(), store, cfg.ProtocolConfiguration) require.NoError(t, err, "could not create levelDB chain")