Merge pull request #399 from nspcc-dev/refactor_store
storage: refactor store, add Close()
This commit is contained in:
commit
c98a626871
11 changed files with 42 additions and 27 deletions
|
@ -112,7 +112,7 @@ Main:
|
|||
|
||||
// initBlockChain initializes BlockChain with preselected DB.
|
||||
func initBlockChain(context context.Context, cfg config.Config) (*core.Blockchain, error) {
|
||||
store, err := storage.NewStore(context, cfg.ApplicationConfiguration.DBConfiguration)
|
||||
store, err := storage.NewStore(cfg.ApplicationConfiguration.DBConfiguration)
|
||||
if err != nil {
|
||||
return nil, cli.NewExitError(fmt.Errorf("could not initialize storage: %s", err), 1)
|
||||
}
|
||||
|
|
|
@ -154,7 +154,12 @@ func (bc *Blockchain) init() error {
|
|||
|
||||
func (bc *Blockchain) run(ctx context.Context) {
|
||||
persistTimer := time.NewTimer(persistInterval)
|
||||
defer persistTimer.Stop()
|
||||
defer func() {
|
||||
persistTimer.Stop()
|
||||
if err := bc.Store.Close(); err != nil {
|
||||
log.Warnf("failed to close db: %s", err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
|
@ -2,7 +2,6 @@ package storage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -41,7 +40,7 @@ func (b *BoltDBBatch) Put(k, v []byte) {
|
|||
}
|
||||
|
||||
// NewBoltDBStore returns a new ready to use BoltDB storage with created bucket.
|
||||
func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error) {
|
||||
func NewBoltDBStore(cfg BoltDBOptions) (*BoltDBStore, error) {
|
||||
var opts *bbolt.Options // should be exposed via BoltDBOptions if anything needed
|
||||
fileMode := os.FileMode(0600) // should be exposed via BoltDBOptions if anything needed
|
||||
fileName := cfg.FilePath
|
||||
|
@ -62,12 +61,6 @@ func NewBoltDBStore(ctx context.Context, cfg BoltDBOptions) (*BoltDBStore, error
|
|||
return nil
|
||||
})
|
||||
|
||||
// graceful shutdown
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
return &BoltDBStore{db: db}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -83,7 +82,7 @@ func openStore(t *testing.T) *BoltDBStore {
|
|||
}()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, file.Close())
|
||||
boltDBStore, err := NewBoltDBStore(context.Background(), BoltDBOptions{FilePath: testFileName})
|
||||
boltDBStore, err := NewBoltDBStore(BoltDBOptions{FilePath: testFileName})
|
||||
require.NoError(t, err)
|
||||
return boltDBStore
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
|
@ -22,7 +20,7 @@ type LevelDBStore struct {
|
|||
|
||||
// NewLevelDBStore return a new LevelDBStore object that will
|
||||
// initialize the database found at the given path.
|
||||
func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, error) {
|
||||
func NewLevelDBStore(cfg LevelDBOptions) (*LevelDBStore, error) {
|
||||
var opts *opt.Options = nil // should be exposed via LevelDBOptions if anything needed
|
||||
|
||||
db, err := leveldb.OpenFile(cfg.DataDirectoryPath, opts)
|
||||
|
@ -30,12 +28,6 @@ func NewLevelDBStore(ctx context.Context, cfg LevelDBOptions) (*LevelDBStore, er
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// graceful shutdown
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
db.Close()
|
||||
}()
|
||||
|
||||
return &LevelDBStore{
|
||||
path: cfg.DataDirectoryPath,
|
||||
db: db,
|
||||
|
@ -72,3 +64,8 @@ func (s *LevelDBStore) Seek(key []byte, f func(k, v []byte)) {
|
|||
func (s *LevelDBStore) Batch() Batch {
|
||||
return new(leveldb.Batch)
|
||||
}
|
||||
|
||||
// Close implements the Store interface.
|
||||
func (s *LevelDBStore) Close() error {
|
||||
return s.db.Close()
|
||||
}
|
||||
|
|
|
@ -76,6 +76,14 @@ func (s *MemoryStore) Batch() Batch {
|
|||
}
|
||||
}
|
||||
|
||||
// Close implements Store interface and clears up memory.
|
||||
func (s *MemoryStore) Close() error {
|
||||
s.Lock()
|
||||
s.mem = nil
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeKey(k []byte) string {
|
||||
return hex.EncodeToString(k)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGetPut(t *testing.T) {
|
||||
|
@ -22,6 +23,7 @@ func TestGetPut(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, value, newVal)
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
func TestKeyNotExist(t *testing.T) {
|
||||
|
@ -33,6 +35,7 @@ func TestKeyNotExist(t *testing.T) {
|
|||
_, err := s.Get(key)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, err.Error(), "key not found")
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
func TestPutBatch(t *testing.T) {
|
||||
|
@ -54,4 +57,5 @@ func TestPutBatch(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
assert.Equal(t, value, newVal)
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
|
|
@ -92,3 +92,8 @@ func (s *RedisStore) Seek(k []byte, f func(k, v []byte)) {
|
|||
f([]byte(key), []byte(val))
|
||||
}
|
||||
}
|
||||
|
||||
// Close implements the Store interface.
|
||||
func (s *RedisStore) Close() error {
|
||||
return s.client.Close()
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/alicebob/miniredis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNewRedisBatch(t *testing.T) {
|
||||
|
@ -26,6 +27,7 @@ func TestNewRedisStore(t *testing.T) {
|
|||
assert.Nil(t, err, "NewRedisStore Get error")
|
||||
|
||||
assert.Equal(t, value, result)
|
||||
require.NoError(t, redisStore.Close())
|
||||
redisMock.Close()
|
||||
}
|
||||
|
||||
|
@ -123,6 +125,7 @@ func TestRedisStore_GetAndPut(t *testing.T) {
|
|||
redisMock.FlushDB()
|
||||
})
|
||||
}
|
||||
require.NoError(t, redisStore.Close())
|
||||
redisMock.Close()
|
||||
}
|
||||
|
||||
|
@ -134,6 +137,7 @@ func TestRedisStore_PutBatch(t *testing.T) {
|
|||
result, err := redisStore.Get([]byte("foo1"))
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, []byte("bar1"), result)
|
||||
require.NoError(t, redisStore.Close())
|
||||
mock.Close()
|
||||
}
|
||||
|
||||
|
@ -142,6 +146,7 @@ func TestRedisStore_Seek(t *testing.T) {
|
|||
redisStore.Seek([]byte("foo"), func(k, v []byte) {
|
||||
assert.Equal(t, []byte("bar"), v)
|
||||
})
|
||||
require.NoError(t, redisStore.Close())
|
||||
mock.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
)
|
||||
|
@ -37,6 +36,7 @@ type (
|
|||
Put(k, v []byte) error
|
||||
PutBatch(Batch) error
|
||||
Seek(k []byte, f func(k, v []byte))
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Batch represents an abstraction on top of batch operations.
|
||||
|
@ -75,18 +75,18 @@ func AppendPrefixInt(k KeyPrefix, n int) []byte {
|
|||
}
|
||||
|
||||
// NewStore creates storage with preselected in configuration database type.
|
||||
func NewStore(context context.Context, cfg DBConfiguration) (Store, error) {
|
||||
func NewStore(cfg DBConfiguration) (Store, error) {
|
||||
var store Store
|
||||
var err error
|
||||
switch cfg.Type {
|
||||
case "leveldb":
|
||||
store, err = NewLevelDBStore(context, cfg.LevelDBOptions)
|
||||
store, err = NewLevelDBStore(cfg.LevelDBOptions)
|
||||
case "inmemory":
|
||||
store = NewMemoryStore()
|
||||
case "redis":
|
||||
store, err = NewRedisStore(cfg.RedisDBOptions)
|
||||
case "boltdb":
|
||||
store, err = NewBoltDBStore(context, cfg.BoltDBOptions)
|
||||
store, err = NewBoltDBStore(cfg.BoltDBOptions)
|
||||
}
|
||||
return store, err
|
||||
}
|
||||
|
|
|
@ -246,8 +246,7 @@ func TestHandler(t *testing.T) {
|
|||
cfg, err := config.Load(configPath, net)
|
||||
require.NoError(t, err, "could not load config")
|
||||
|
||||
store, err := storage.NewLevelDBStore(context.Background(),
|
||||
cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions)
|
||||
store, err := storage.NewLevelDBStore(cfg.ApplicationConfiguration.DBConfiguration.LevelDBOptions)
|
||||
assert.Nil(t, err)
|
||||
chain, err := core.NewBlockchain(context.Background(), store, cfg.ProtocolConfiguration)
|
||||
require.NoError(t, err, "could not create levelDB chain")
|
||||
|
|
Loading…
Reference in a new issue