storage: close function
add close function to storage interface add common defer function call which will close db connection remove context as soon as it's not needed anymore updated unit tests
This commit is contained in:
parent
b21a220712
commit
264dfef370
11 changed files with 42 additions and 27 deletions
|
@ -112,7 +112,7 @@ Main:
|
|||
|
||||
// initBlockChain initializes BlockChain with preselected DB.
|
||||
func initBlockChain(context context.Context, cfg config.Config) (*core.Blockchain, error) {
|
||||
store, err := storage.NewStore(context, cfg.ApplicationConfiguration.DBConfiguration)
|
||||
store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration)
|
||||
if err != nil {
|
||||
return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1)
|
||||
}
|
||||
|
|
|
@ -154,7 +154,12 @@ func (bc *Blockchain) init() error {
|
|||
|
||||
func (bc *Blockchain) run(ctx context.Context) {
|
||||
persistTimer := time.NewTimer(persistInterval)
|
||||
defer persistTimer.Stop()
|
||||
defer func() {
|
||||
persistTimer.Stop()
|
||||
if err := bc.Store.Close(); err != nil {
|
||||
log.Warnf("failed to close db: %s", err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -41,7 +40,7 @@ func (b *BoltDBBatch) Put(k, v []byte) {
|
|||
}
|
||||
|
||||
// NewBoltDBStore returns a new ready to use BoltDB storage with created bucket.
|
||||
func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error) {
|
||||
func NewBoltDBStore(cfg BoltDBOptions) (*BoltDBStore, error) {
|
||||
var opts *bbolt.Options // should be exposed via BoltDBOptions if anything needed
|
||||
fileMode := os.FileMode(0600) // should be exposed via BoltDBOptions if anything needed
|
||||
fileName := cfg.FilePath
|
||||
|
@ -62,12 +61,6 @@ func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error
|
|||
return nil
|
||||
})
|
||||
|
||||
// graceful shutdown
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
return &BoltDBStore{db: db}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -83,7 +82,7 @@ func openStore(t *testing.T) *BoltDBStore {
|
|||
}()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, file.Close())
|
||||
boltDBStore, err := NewBoltDBStore(context.Background(), BoltDBOptions{FilePath: testFileName})
|
||||
boltDBStore, err := NewBoltDBStore(BoltDBOptions{FilePath: testFileName})
|
||||
require.NoError(t, err)
|
||||
return boltDBStore
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
|
@ -22,7 +20,7 @@ type LevelDBStore struct {
|
|||
|
||||
// NewLevelDBStore return a new LevelDBStore object that will
|
||||
// initialize the database found at the given path.
|
||||
func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, error) {
|
||||
func NewLevelDBStore(cfg LevelDBOptions) (*LevelDBStore, error) {
|
||||
var opts *opt.Options = nil // should be exposed via LevelDBOptions if anything needed
|
||||
|
||||
db, err := leveldb.OpenFile(cfg.DataDirectoryPath, opts)
|
||||
|
@ -30,12 +28,6 @@ func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, er
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// graceful shutdown
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
return &LevelDBStore{
|
||||
path: cfg.DataDirectoryPath,
|
||||
db: db,
|
||||
|
@ -72,3 +64,8 @@ func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) {
|
|||
func (s *LevelDBStore) Batch() Batch {
|
||||
return new(leveldb.Batch)
|
||||
}
|
||||
|
||||
// Close implements the Store interface.
|
||||
func (s *LevelDBStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
|
|
@ -76,6 +76,14 @@ func (s *MemoryStore) Batch() Batch {
|
|||
}
|
||||
}
|
||||
|
||||
// Close implements Store interface and clears up memory.
|
||||
func (s *MemoryStore) Close() error {
|
||||
s.Lock()
|
||||
s.mem = nil
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeKey(k []byte) string {
|
||||
return hex.EncodeToString(k)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetPut(t *testing.T) {
|
||||
|
@ -22,6 +23,7 @@ func TestGetPut(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, value, newVal)
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
func TestKeyNotExist(t *testing.T) {
|
||||
|
@ -33,6 +35,7 @@ func TestKeyNotExist(t *testing.T) {
|
|||
_, err := s.Get(key)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, err.Error(), "key not found")
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
func TestPutBatch(t *testing.T) {
|
||||
|
@ -54,4 +57,5 @@ func TestPutBatch(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, value, newVal)
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
|
|
@ -92,3 +92,8 @@ func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) {
|
|||
f([]byte(key), []byte(val))
|
||||
}
|
||||
}
|
||||
|
||||
// Close implements the Store interface.
|
||||
func (s *RedisStore) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/alicebob/miniredis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewRedisBatch(t *testing.T) {
|
||||
|
@ -26,6 +27,7 @@ func TestNewRedisStore(t *testing.T) {
|
|||
assert.Nil(t, err, "NewRedisStore Get error")
|
||||
|
||||
assert.Equal(t, value, result)
|
||||
require.NoError(t, redisStore.Close())
|
||||
redisMock.Close()
|
||||
}
|
||||
|
||||
|
@ -123,6 +125,7 @@ func TestRedisStore_GetAndPut(t *testing.T) {
|
|||
redisMock.FlushDB()
|
||||
})
|
||||
}
|
||||
require.NoError(t, redisStore.Close())
|
||||
redisMock.Close()
|
||||
}
|
||||
|
||||
|
@ -134,6 +137,7 @@ func TestRedisStore_PutBatch(t *testing.T) {
|
|||
result, err := redisStore.Get([]byte("foo1"))
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, []byte("bar1"), result)
|
||||
require.NoError(t, redisStore.Close())
|
||||
mock.Close()
|
||||
}
|
||||
|
||||
|
@ -142,6 +146,7 @@ func TestRedisStore_Seek(t *testing.T) {
|
|||
redisStore.Seek([]byte("foo"), func(k, v []byte) {
|
||||
assert.Equal(t, []byte("bar"), v)
|
||||
})
|
||||
require.NoError(t, redisStore.Close())
|
||||
mock.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
)
|
||||
|
@ -37,6 +36,7 @@ type (
|
|||
Put(k, v []byte) error
|
||||
PutBatch(Batch) error
|
||||
Seek(k []byte, f func(k, v []byte))
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Batch represents an abstraction on top of batch operations.
|
||||
|
@ -75,18 +75,18 @@ func AppendPrefixInt(k KeyPrefix, n int) []byte {
|
|||
}
|
||||
|
||||
// NewStore creates storage with preselected in configuration database type.
|
||||
func NewStore(context context.Context, cfg DBConfiguration) (Store, error) {
|
||||
func NewStore(cfg DBConfiguration) (Store, error) {
|
||||
var store Store
|
||||
var err error
|
||||
switch cfg.Type {
|
||||
case "leveldb":
|
||||
store, err = NewLevelDBStore(context, cfg.LevelDBOptions)
|
||||
store, err = NewLevelDBStore(cfg.LevelDBOptions)
|
||||
case "inmemory":
|
||||
store = NewMemoryStore()
|
||||
case "redis":
|
||||
store, err = NewRedisStore(cfg.RedisDBOptions)
|
||||
case "boltdb":
|
||||
store, err = NewBoltDBStore(context, cfg.BoltDBOptions)
|
||||
store, err = NewBoltDBStore(cfg.BoltDBOptions)
|
||||
}
|
||||
return store, err
|
||||
}
|
||||
|
|
|
@ -246,8 +246,7 @@ func TestHandler(t *testing.T) {
|
|||
cfg, err := config.Load(configPath, net)
|
||||
require.NoError(t, err, "could not load config")
|
||||
|
||||
store, err := storage.NewLevelDBStore(context.Background(),
|
||||
cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions)
|
||||
store, err := storage.NewLevelDBStore(cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions)
|
||||
assert.Nil(t, err)
|
||||
chain, err := core.NewBlockchain(context.Background(), store, cfg.ProtocolConfiguration)
|
||||
require.NoError(t, err, "could not create levelDB chain")
|
||||
|
|
Loading…
Reference in a new issue