[#65] Support FS bucket and BoltDB bucket
These buckets can be used to store blobs and metadata. They will be removed as enhanced blob storage will be implemented for neofs-node. To setup storage type use `storage.object.type` and `storage.meta.type` params. Available options: - inmemory (default) - boltdb - filesystem Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
861bac3892
commit
6ee908c2db
4 changed files with 87 additions and 23 deletions
|
@ -13,11 +13,15 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/misc"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/boltdb"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/bucket/fsbucket"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/network"
|
||||
tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/viper"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
|
@ -56,6 +60,9 @@ const (
|
|||
cfgContainerFee = "container.fee"
|
||||
|
||||
cfgMaxObjectSize = "node.maxobjectsize" // get value from chain
|
||||
|
||||
cfgObjectStorage = "storage.object"
|
||||
cfgMetaStorage = "storage.meta"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -138,6 +145,10 @@ type cfgObject struct {
|
|||
cnrStorage container.Source
|
||||
|
||||
maxObjectSize uint64
|
||||
|
||||
metastorage bucket.Bucket
|
||||
|
||||
blobstorage bucket.Bucket
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -173,7 +184,7 @@ func initCfg(path string) *cfg {
|
|||
maxChunkSize := viperCfg.GetUint64(cfgMaxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
||||
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
|
||||
|
||||
return &cfg{
|
||||
c := &cfg{
|
||||
ctx: context.Background(),
|
||||
viper: viperCfg,
|
||||
log: log,
|
||||
|
@ -204,6 +215,10 @@ func initCfg(path string) *cfg {
|
|||
},
|
||||
localAddr: netAddr,
|
||||
}
|
||||
|
||||
initLocalStorage(c)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func initViper(path string) *viper.Viper {
|
||||
|
@ -246,6 +261,9 @@ func defaultConfiguration(v *viper.Viper) {
|
|||
v.SetDefault(cfgNetmapContract, "75194459637323ea8837d2afe8225ec74a5658c3")
|
||||
v.SetDefault(cfgNetmapFee, "1")
|
||||
|
||||
v.SetDefault(cfgObjectStorage+".type", "inmemory")
|
||||
v.SetDefault(cfgMetaStorage+".type", "inmemory")
|
||||
|
||||
v.SetDefault(cfgLogLevel, "info")
|
||||
v.SetDefault(cfgLogFormat, "console")
|
||||
v.SetDefault(cfgLogTrace, "fatal")
|
||||
|
@ -256,3 +274,43 @@ func defaultConfiguration(v *viper.Viper) {
|
|||
func (c *cfg) LocalAddress() *network.Address {
|
||||
return c.localAddr
|
||||
}
|
||||
|
||||
func initLocalStorage(c *cfg) {
|
||||
var err error
|
||||
|
||||
c.cfgObject.blobstorage, err = initBucket(cfgObjectStorage, c)
|
||||
fatalOnErr(err)
|
||||
|
||||
c.cfgObject.metastorage, err = initBucket(cfgMetaStorage, c)
|
||||
fatalOnErr(err)
|
||||
}
|
||||
|
||||
func initBucket(prefix string, c *cfg) (bucket bucket.Bucket, err error) {
|
||||
const inmemory = "inmemory"
|
||||
|
||||
switch c.viper.GetString(prefix + ".type") {
|
||||
case inmemory:
|
||||
bucket = newBucket()
|
||||
c.log.Info("using in-memory bucket", zap.String("storage", prefix))
|
||||
case boltdb.Name:
|
||||
opts, err := boltdb.NewOptions(prefix, c.viper)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create boltdb opts")
|
||||
}
|
||||
bucket, err = boltdb.NewBucket(&opts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create boltdb bucket")
|
||||
}
|
||||
c.log.Info("using boltdb bucket", zap.String("storage", prefix))
|
||||
case fsbucket.Name:
|
||||
bucket, err = fsbucket.NewBucket(prefix, c.viper)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't create fs bucket")
|
||||
}
|
||||
c.log.Info("using filesystem bucket", zap.String("storage", prefix))
|
||||
default:
|
||||
return nil, errors.New("unknown storage type")
|
||||
}
|
||||
|
||||
return bucket, nil
|
||||
}
|
||||
|
|
|
@ -149,7 +149,10 @@ func (s *objectSvc) GetRangeHash(ctx context.Context, req *object.GetRangeHashRe
|
|||
}
|
||||
|
||||
func initObjectService(c *cfg) {
|
||||
ls := localstore.New(newBucket(), newBucket())
|
||||
ls := localstore.New(
|
||||
c.cfgObject.blobstorage,
|
||||
c.cfgObject.metastorage,
|
||||
)
|
||||
keyStorage := util.NewKeyStorage(c.key, c.privateTokenStore)
|
||||
nodeOwner := owner.NewID()
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ const defaultFilePermission = 0777
|
|||
|
||||
var errEmptyPath = errors.New("database empty path")
|
||||
|
||||
const name = "boltbucket"
|
||||
const Name = "boltdb"
|
||||
|
||||
func makeCopy(val []byte) []byte {
|
||||
tmp := make([]byte, len(val))
|
||||
|
@ -41,7 +41,9 @@ func makeCopy(val []byte) []byte {
|
|||
}
|
||||
|
||||
// NewOptions prepares options for badger instance.
|
||||
func NewOptions(v *viper.Viper) (opts Options, err error) {
|
||||
func NewOptions(prefix string, v *viper.Viper) (opts Options, err error) {
|
||||
prefix = prefix + "." + Name
|
||||
|
||||
opts = Options{
|
||||
Options: bbolt.Options{
|
||||
// set defaults:
|
||||
|
@ -49,30 +51,30 @@ func NewOptions(v *viper.Viper) (opts Options, err error) {
|
|||
FreelistType: bbolt.DefaultOptions.FreelistType,
|
||||
|
||||
// set config options:
|
||||
NoSync: v.GetBool(name + ".no_sync"),
|
||||
ReadOnly: v.GetBool(name + ".read_only"),
|
||||
NoGrowSync: v.GetBool(name + ".no_grow_sync"),
|
||||
NoFreelistSync: v.GetBool(name + ".no_freelist_sync"),
|
||||
NoSync: v.GetBool(prefix + ".no_sync"),
|
||||
ReadOnly: v.GetBool(prefix + ".read_only"),
|
||||
NoGrowSync: v.GetBool(prefix + ".no_grow_sync"),
|
||||
NoFreelistSync: v.GetBool(prefix + ".no_freelist_sync"),
|
||||
|
||||
PageSize: v.GetInt(name + ".page_size"),
|
||||
MmapFlags: v.GetInt(name + ".mmap_flags"),
|
||||
InitialMmapSize: v.GetInt(name + ".initial_mmap_size"),
|
||||
PageSize: v.GetInt(prefix + ".page_size"),
|
||||
MmapFlags: v.GetInt(prefix + ".mmap_flags"),
|
||||
InitialMmapSize: v.GetInt(prefix + ".initial_mmap_size"),
|
||||
},
|
||||
|
||||
Name: []byte(name),
|
||||
Name: []byte(Name),
|
||||
Perm: defaultFilePermission,
|
||||
Path: v.GetString(name + ".path"),
|
||||
Path: v.GetString(prefix + ".path"),
|
||||
}
|
||||
|
||||
if opts.Path == "" {
|
||||
return opts, errEmptyPath
|
||||
}
|
||||
|
||||
if tmp := v.GetDuration(name + ".lock_timeout"); tmp > 0 {
|
||||
if tmp := v.GetDuration(prefix + ".lock_timeout"); tmp > 0 {
|
||||
opts.Timeout = tmp
|
||||
}
|
||||
|
||||
if perm := v.GetUint32(name + ".perm"); perm != 0 {
|
||||
if perm := v.GetUint32(prefix + ".perm"); perm != 0 {
|
||||
opts.Perm = os.FileMode(perm)
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
const name = "fsbucket"
|
||||
const Name = "filesystem"
|
||||
|
||||
const (
|
||||
defaultDirectory = "fsbucket"
|
||||
|
@ -51,7 +51,8 @@ func decodeKey(key string) []byte {
|
|||
}
|
||||
|
||||
// NewBucket creates new file system bucket instance.
|
||||
func NewBucket(v *viper.Viper) (bucket.Bucket, error) {
|
||||
func NewBucket(prefix string, v *viper.Viper) (bucket.Bucket, error) {
|
||||
prefix = prefix + "." + Name
|
||||
var (
|
||||
dir string
|
||||
perm os.FileMode
|
||||
|
@ -60,27 +61,27 @@ func NewBucket(v *viper.Viper) (bucket.Bucket, error) {
|
|||
depth int
|
||||
)
|
||||
|
||||
if dir = v.GetString(name + ".directory"); dir == "" {
|
||||
if dir = v.GetString(prefix + ".directory"); dir == "" {
|
||||
dir = defaultDirectory
|
||||
}
|
||||
|
||||
if perm = os.FileMode(v.GetInt(name + ".permissions")); perm == 0 {
|
||||
if perm = os.FileMode(v.GetInt(prefix + ".permissions")); perm == 0 {
|
||||
perm = defaultPermissions
|
||||
}
|
||||
|
||||
if depth = v.GetInt(name + ".depth"); depth <= 0 {
|
||||
if depth = v.GetInt(prefix + ".depth"); depth <= 0 {
|
||||
depth = defaultDepth
|
||||
}
|
||||
|
||||
if prefixLen = v.GetInt(name + ".prefix_len"); prefixLen <= 0 {
|
||||
if prefixLen = v.GetInt(prefix + ".prefix_len"); prefixLen <= 0 {
|
||||
prefixLen = defaultPrefixLen
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(dir, perm); err != nil {
|
||||
return nil, errors.Wrapf(err, "could not create bucket %s", name)
|
||||
return nil, errors.Wrapf(err, "could not create bucket %s", Name)
|
||||
}
|
||||
|
||||
if v.GetBool(name + ".tree_enabled") {
|
||||
if v.GetBool(prefix + ".tree_enabled") {
|
||||
b := &treeBucket{
|
||||
dir: dir,
|
||||
perm: perm,
|
||||
|
|
Loading…
Reference in a new issue