[#136] cmd/neofs-node: Use new metabase in app
Remove BoltDB bucket package. Construct meta.DB instance in node app. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
c0aa892161
commit
8125b544b4
4 changed files with 24 additions and 216 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -16,8 +17,8 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/boltdb"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
|
||||
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
|
@ -29,6 +30,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/viper"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
@ -75,7 +77,9 @@ const (
|
|||
cfgContainerFee = "container.fee"
|
||||
|
||||
cfgObjectStorage = "storage.object"
|
||||
cfgMetaStorage = "storage.meta"
|
||||
|
||||
cfgMetaBasePath = "storage.metabase.path"
|
||||
cfgMetaBasePerm = "storage.metabase.perm"
|
||||
|
||||
cfgGCQueueSize = "gc.queuesize"
|
||||
cfgGCQueueTick = "gc.duration.sleep"
|
||||
|
@ -193,7 +197,7 @@ type cfgObject struct {
|
|||
|
||||
cnrStorage container.Source
|
||||
|
||||
metastorage bucket.Bucket
|
||||
metastorage *meta.DB
|
||||
|
||||
blobstorage bucket.Bucket
|
||||
|
||||
|
@ -319,7 +323,9 @@ func defaultConfiguration(v *viper.Viper) {
|
|||
v.SetDefault(cfgNetmapFee, "1")
|
||||
|
||||
v.SetDefault(cfgObjectStorage+".type", "inmemory")
|
||||
v.SetDefault(cfgMetaStorage+".type", "inmemory")
|
||||
|
||||
v.SetDefault(cfgMetaBasePath, "metabase")
|
||||
v.SetDefault(cfgMetaBasePerm, 0600)
|
||||
|
||||
v.SetDefault(cfgLogLevel, "info")
|
||||
v.SetDefault(cfgLogFormat, "console")
|
||||
|
@ -355,8 +361,14 @@ func initLocalStorage(c *cfg) {
|
|||
c.cfgObject.blobstorage, err = initBucket(cfgObjectStorage, c)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.cfgObject.metastorage, err = initBucket(cfgMetaStorage, c)
|
||||
boltDB, err := bbolt.Open(
|
||||
c.viper.GetString(cfgMetaBasePath),
|
||||
os.FileMode(c.viper.GetUint32(cfgMetaBasePerm)),
|
||||
nil,
|
||||
)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.cfgObject.metastorage = meta.NewDB(boltDB)
|
||||
}
|
||||
|
||||
func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) {
|
||||
|
@ -366,16 +378,6 @@ func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) {
|
|||
case inmemory:
|
||||
bucket = newBucket()
|
||||
c.log.Info("using in-memory bucket", zap.String("storage", prefix))
|
||||
case boltdb.Name:
|
||||
opts, err := boltdb.NewOptions(prefix, c.viper)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create boltdb opts")
|
||||
}
|
||||
bucket, err = boltdb.NewBucket(&opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create boltdb bucket")
|
||||
}
|
||||
c.log.Info("using boltdb bucket", zap.String("storage", prefix))
|
||||
case fsbucket.Name:
|
||||
bucket, err = fsbucket.NewBucket(prefix, c.viper)
|
||||
if err != nil {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"log"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func fatalOnErr(err error) {
|
||||
|
@ -57,6 +58,12 @@ func wait(c *cfg) {
|
|||
}
|
||||
|
||||
func shutdown(c *cfg) {
|
||||
if err := c.cfgObject.metastorage.Close(); err != nil {
|
||||
c.log.Error("could not close metabase",
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
c.cfgGRPC.server.GracefulStop()
|
||||
|
||||
c.log.Info("gRPC server stopped")
|
||||
|
|
|
@ -1,107 +0,0 @@
|
|||
package boltdb
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/viper"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type (
|
||||
boltBucket struct {
|
||||
db *bbolt.DB
|
||||
name []byte
|
||||
}
|
||||
|
||||
// Options groups the BoltDB bucket's options.
|
||||
Options struct {
|
||||
bbolt.Options
|
||||
Name []byte
|
||||
Path string
|
||||
Perm os.FileMode
|
||||
}
|
||||
)
|
||||
|
||||
const defaultFilePermission = 0777
|
||||
|
||||
var errEmptyPath = errors.New("database empty path")
|
||||
|
||||
const Name = "boltdb"
|
||||
|
||||
func makeCopy(val []byte) []byte {
|
||||
tmp := make([]byte, len(val))
|
||||
copy(tmp, val)
|
||||
|
||||
return tmp
|
||||
}
|
||||
|
||||
// NewOptions prepares options for badger instance.
|
||||
func NewOptions(prefix string, v *viper.Viper) (opts Options, err error) {
|
||||
prefix = prefix + "." + Name
|
||||
|
||||
opts = Options{
|
||||
Options: bbolt.Options{
|
||||
// set defaults:
|
||||
Timeout: bbolt.DefaultOptions.Timeout,
|
||||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||
|
||||
// set config options:
|
||||
NoSync: v.GetBool(prefix + ".no_sync"),
|
||||
ReadOnly: v.GetBool(prefix + ".read_only"),
|
||||
NoGrowSync: v.GetBool(prefix + ".no_grow_sync"),
|
||||
NoFreelistSync: v.GetBool(prefix + ".no_freelist_sync"),
|
||||
|
||||
PageSize: v.GetInt(prefix + ".page_size"),
|
||||
MmapFlags: v.GetInt(prefix + ".mmap_flags"),
|
||||
InitialMmapSize: v.GetInt(prefix + ".initial_mmap_size"),
|
||||
},
|
||||
|
||||
Name: []byte(Name),
|
||||
Perm: defaultFilePermission,
|
||||
Path: v.GetString(prefix + ".path"),
|
||||
}
|
||||
|
||||
if opts.Path == "" {
|
||||
return opts, errEmptyPath
|
||||
}
|
||||
|
||||
if tmp := v.GetDuration(prefix + ".lock_timeout"); tmp > 0 {
|
||||
opts.Timeout = tmp
|
||||
}
|
||||
|
||||
if perm := v.GetUint32(prefix + ".perm"); perm != 0 {
|
||||
opts.Perm = os.FileMode(perm)
|
||||
}
|
||||
|
||||
base := path.Dir(opts.Path)
|
||||
if err := os.MkdirAll(base, opts.Perm); err != nil {
|
||||
return opts, errors.Wrapf(err, "could not use `%s` dir", base)
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
// NewBucket creates badger-bucket instance.
|
||||
func NewBucket(opts *Options) (bucket.Bucket, error) {
|
||||
log.SetOutput(ioutil.Discard) // disable default logger
|
||||
|
||||
db, err := bbolt.Open(opts.Path, opts.Perm, &opts.Options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(opts.Name)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &boltBucket{db: db, name: opts.Name}, nil
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
package boltdb
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/mr-tron/base58"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// Get value by key or return error.
|
||||
func (b *boltBucket) Get(key []byte) (data []byte, err error) {
|
||||
err = b.db.View(func(txn *bbolt.Tx) error {
|
||||
txn.Bucket(b.name).Cursor().Seek(key)
|
||||
val := txn.Bucket(b.name).Get(key)
|
||||
if val == nil {
|
||||
return errors.Wrapf(bucket.ErrNotFound, "key=%s", base58.Encode(key))
|
||||
}
|
||||
|
||||
data = makeCopy(val)
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Set value for key.
|
||||
func (b *boltBucket) Set(key, value []byte) error {
|
||||
return b.db.Update(func(txn *bbolt.Tx) error {
|
||||
k, v := makeCopy(key), makeCopy(value)
|
||||
return txn.Bucket(b.name).Put(k, v)
|
||||
})
|
||||
}
|
||||
|
||||
// Del removes item from bucket by key.
|
||||
func (b *boltBucket) Del(key []byte) error {
|
||||
return b.db.Update(func(txn *bbolt.Tx) error {
|
||||
return txn.Bucket(b.name).Delete(key)
|
||||
})
|
||||
}
|
||||
|
||||
// Has checks key exists.
|
||||
func (b *boltBucket) Has(key []byte) bool {
|
||||
_, err := b.Get(key)
|
||||
return !errors.Is(errors.Cause(err), bucket.ErrNotFound)
|
||||
}
|
||||
|
||||
// Size returns size of database.
|
||||
func (b *boltBucket) Size() int64 {
|
||||
info, err := os.Stat(b.db.Path())
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return info.Size()
|
||||
}
|
||||
|
||||
// List all items in bucket.
|
||||
func (b *boltBucket) List() ([][]byte, error) {
|
||||
var items [][]byte
|
||||
|
||||
if err := b.db.View(func(txn *bbolt.Tx) error {
|
||||
return txn.Bucket(b.name).ForEach(func(k, _ []byte) error {
|
||||
items = append(items, makeCopy(k))
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// Filter elements by filter closure.
|
||||
func (b *boltBucket) Iterate(handler bucket.FilterHandler) error {
|
||||
if handler == nil {
|
||||
return bucket.ErrNilFilterHandler
|
||||
}
|
||||
|
||||
return b.db.View(func(txn *bbolt.Tx) error {
|
||||
return txn.Bucket(b.name).ForEach(func(k, v []byte) error {
|
||||
if !handler(makeCopy(k), makeCopy(v)) {
|
||||
return bucket.ErrIteratingAborted
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Close bucket database.
|
||||
func (b *boltBucket) Close() error {
|
||||
return b.db.Close()
|
||||
}
|
Loading…
Reference in a new issue