[#1523] neofs-node: Refactor configuration

1. Move compression parameters to the `shard` section.
2. Allow to use multiple sub-storage components in the blobstor.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-11 15:34:17 +03:00 committed by fyrchik
parent 13cdbde2e2
commit 26b4a258e0
37 changed files with 595 additions and 419 deletions

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/hrw"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobovnicza"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
@ -76,8 +77,6 @@ type Blobovniczas struct {
// list of active (opened, non-filled) Blobovniczas
activeMtx sync.RWMutex
active map[string]blobovniczaWithIndex
onClose []func()
}
type blobovniczaWithIndex struct {
@ -887,3 +886,8 @@ func u64FromHexString(str string) uint64 {
func (b *Blobovniczas) Type() string {
return "blobovniczas"
}
// SetCompressor implements common.Storage.
func (b *Blobovniczas) SetCompressor(cc *compression.CConfig) {
b.CConfig = cc
}

View file

@ -19,16 +19,6 @@ func (b *Blobovniczas) Open(readOnly bool) error {
func (b *Blobovniczas) Init() error {
b.log.Debug("initializing Blobovnicza's")
err := b.CConfig.Init()
if err != nil {
return err
}
b.onClose = append(b.onClose, func() {
if err := b.CConfig.Close(); err != nil {
b.log.Debug("can't close zstd compressor", zap.String("err", err.Error()))
}
})
if b.readOnly {
b.log.Debug("read-only mode, skip blobovniczas initialization...")
return nil
@ -78,9 +68,5 @@ func (b *Blobovniczas) Close() error {
b.activeMtx.Unlock()
for i := range b.onClose {
b.onClose[i]()
}
return nil
}

View file

@ -36,7 +36,6 @@ func initConfig(c *cfg) {
openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth,
CConfig: new(compression.CConfig),
}
}
@ -53,12 +52,6 @@ func WithPermissions(perm fs.FileMode) Option {
}
}
func WithCompressionConfig(cc *compression.CConfig) Option {
return func(c *cfg) {
c.CConfig = cc
}
}
func WithBlobovniczaShallowWidth(width uint64) Option {
return func(c *cfg) {
c.blzShallowWidth = width

View file

@ -1,12 +1,8 @@
package blobstor
import (
"encoding/hex"
"io/fs"
"path/filepath"
"sync"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
@ -28,8 +24,6 @@ type BlobStor struct {
modeMtx sync.RWMutex
mode mode.Mode
storage [2]SubStorage
}
type Info = fstree.Info
@ -39,37 +33,12 @@ type Option func(*cfg)
type cfg struct {
compression.CConfig
fsTreeDepth int
fsTreeInfo fstree.Info
smallSizeLimit uint64
log *logger.Logger
blzOpts []blobovniczatree.Option
log *logger.Logger
storage []SubStorage
}
const (
defaultShallowDepth = 4
defaultPerm = 0700
defaultSmallSizeLimit = 1 << 20 // 1MB
)
const blobovniczaDir = "blobovnicza"
func initConfig(c *cfg) {
*c = cfg{
fsTreeDepth: defaultShallowDepth,
fsTreeInfo: Info{
Permissions: defaultPerm,
RootPath: "./",
},
smallSizeLimit: defaultSmallSizeLimit,
log: zap.L(),
}
c.blzOpts = []blobovniczatree.Option{blobovniczatree.WithCompressionConfig(&c.CConfig)}
c.log = zap.L()
}
// New creates, initializes and returns new BlobStor instance.
@ -81,23 +50,10 @@ func New(opts ...Option) *BlobStor {
opts[i](&bs.cfg)
}
bs.storage[0].Storage = blobovniczatree.NewBlobovniczaTree(bs.blzOpts...)
bs.storage[0].Policy = func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= bs.cfg.smallSizeLimit
for i := range bs.storage {
bs.storage[i].Storage.SetCompressor(&bs.CConfig)
}
bs.storage[1].Storage = &fstree.FSTree{
Info: bs.cfg.fsTreeInfo,
Depth: bs.cfg.fsTreeDepth,
DirNameLen: hex.EncodedLen(fstree.DirNameLen),
CConfig: &bs.cfg.CConfig,
}
bs.storage[1].Policy = func(*objectSDK.Object, []byte) bool {
return true
}
bs.blzOpts = nil
return bs
}
@ -106,17 +62,17 @@ func (b *BlobStor) SetLogger(l *zap.Logger) {
b.log = l
}
// WithShallowDepth returns option to set the
// depth of the object file subdirectory tree.
//
// Depth is reduced to maximum value in case of overflow.
func WithShallowDepth(depth int) Option {
// WithStorages provides sub-blobstors.
func WithStorages(st []SubStorage) Option {
return func(c *cfg) {
if depth > fstree.MaxDepth {
depth = fstree.MaxDepth
}
c.storage = st
}
}
c.fsTreeDepth = depth
// WithLogger returns option to specify BlobStor's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l.With(zap.String("component", "BlobStor"))
}
}
@ -141,70 +97,3 @@ func WithUncompressableContentTypes(values []string) Option {
c.UncompressableContentTypes = values
}
}
// WithRootPath returns option to set path to root directory
// of the fs tree to write the objects.
func WithRootPath(rootDir string) Option {
return func(c *cfg) {
c.fsTreeInfo.RootPath = rootDir
c.blzOpts = append(c.blzOpts, blobovniczatree.WithRootPath(filepath.Join(rootDir, blobovniczaDir)))
}
}
// WithRootPerm returns option to set permission
// bits of the fs tree.
func WithRootPerm(perm fs.FileMode) Option {
return func(c *cfg) {
c.fsTreeInfo.Permissions = perm
c.blzOpts = append(c.blzOpts, blobovniczatree.WithPermissions(perm))
}
}
// WithSmallSizeLimit returns option to set maximum size of
// "small" object.
func WithSmallSizeLimit(lim uint64) Option {
return func(c *cfg) {
c.smallSizeLimit = lim
c.blzOpts = append(c.blzOpts, blobovniczatree.WithObjectSizeLimit(lim))
}
}
// WithLogger returns option to specify BlobStor's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l.With(zap.String("component", "BlobStor"))
c.blzOpts = append(c.blzOpts, blobovniczatree.WithLogger(c.log))
}
}
// WithBlobovniczaShallowDepth returns option to specify
// depth of blobovnicza directories.
func WithBlobovniczaShallowDepth(d uint64) Option {
return func(c *cfg) {
c.blzOpts = append(c.blzOpts, blobovniczatree.WithBlobovniczaShallowDepth(d))
}
}
// WithBlobovniczaShallowWidth returns option to specify
// width of blobovnicza directories.
func WithBlobovniczaShallowWidth(w uint64) Option {
return func(c *cfg) {
c.blzOpts = append(c.blzOpts, blobovniczatree.WithBlobovniczaShallowWidth(w))
}
}
// WithBlobovniczaOpenedCacheSize return option to specify
// maximum number of opened non-active blobovnicza's.
func WithBlobovniczaOpenedCacheSize(sz int) Option {
return func(c *cfg) {
c.blzOpts = append(c.blzOpts, blobovniczatree.WithOpenedCacheSize(sz))
}
}
// WithBlobovniczaSize returns option to specify maximum volume
// of each blobovnicza.
func WithBlobovniczaSize(sz uint64) Option {
return func(c *cfg) {
c.blzOpts = append(c.blzOpts, blobovniczatree.WithBlobovniczaSize(sz))
}
}

View file

@ -2,14 +2,35 @@ package blobstor
import (
"os"
"path/filepath"
"testing"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
"github.com/stretchr/testify/require"
)
const blobovniczaDir = "blobovniczas"
func defaultStorages(p string, smallSizeLimit uint64) []SubStorage {
return []SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(p, "blobovniczas")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) <= smallSizeLimit
},
},
{
Storage: fstree.New(fstree.WithPath(p)),
},
}
}
func TestCompression(t *testing.T) {
dir, err := os.MkdirTemp("", "neofs*")
require.NoError(t, err)
@ -21,10 +42,9 @@ func TestCompression(t *testing.T) {
)
newBlobStor := func(t *testing.T, compress bool) *BlobStor {
bs := New(WithCompressObjects(compress),
WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1)) // default width is 16, slow init
bs := New(
WithCompressObjects(compress),
WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
return bs
@ -86,11 +106,22 @@ func TestBlobstor_needsCompression(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) })
bs := New(WithCompressObjects(compress),
WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1),
WithUncompressableContentTypes(ct))
bs := New(
WithCompressObjects(compress),
WithUncompressableContentTypes(ct),
WithStorages([]SubStorage{
{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(filepath.Join(dir, "blobovnicza")),
blobovniczatree.WithBlobovniczaShallowWidth(1)), // default width is 16, slow init
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < smallSizeLimit
},
},
{
Storage: fstree.New(fstree.WithPath(dir)),
},
}))
require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init())
return bs

View file

@ -1,5 +1,7 @@
package common
import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
// Storage represents key-value object storage.
// It is used as a building block for a blobstor of a shard.
type Storage interface {
@ -8,6 +10,7 @@ type Storage interface {
Close() error
Type() string
SetCompressor(cc *compression.CConfig)
Get(GetPrm) (GetRes, error)
GetRange(GetRangePrm) (GetRangeRes, error)

View file

@ -31,6 +31,10 @@ var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stag
func (b *BlobStor) Init() error {
b.log.Debug("initializing...")
if err := b.CConfig.Init(); err != nil {
return err
}
for i := range b.storage {
err := b.storage[i].Storage.Init()
if err != nil {
@ -55,5 +59,10 @@ func (b *BlobStor) Close() error {
continue
}
}
err := b.CConfig.Close()
if firstErr == nil {
firstErr = err
}
return firstErr
}

View file

@ -26,7 +26,7 @@ func (b *BlobStor) Delete(prm common.DeletePrm) (common.DeleteRes, error) {
}
}
if len(prm.StorageID) == 0 {
return b.storage[1].Storage.Delete(prm)
return b.storage[len(b.storage)-1].Storage.Delete(prm)
}
return b.storage[0].Storage.Delete(prm)
}

View file

@ -20,9 +20,8 @@ func TestExists(t *testing.T) {
const smallSizeLimit = 512
b := New(WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1)) // default width is 16, slow init
b := New(
WithStorages(defaultStorages(dir, smallSizeLimit)))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
@ -65,7 +64,7 @@ func TestExists(t *testing.T) {
require.NotEmpty(t, bigDir)
require.NoError(t, os.Chmod(dir, 0))
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, b.fsTreeInfo.Permissions)) })
t.Cleanup(func() { require.NoError(t, os.Chmod(dir, 0777)) })
// Object exists, first error is logged.
prm.Address = objectCore.AddressOf(objects[0])

View file

@ -1,10 +1,16 @@
package fstree
import (
"github.com/nspcc-dev/neofs-node/pkg/util"
)
// Open implements common.Storage.
func (*FSTree) Open(bool) error { return nil }
// Init implements common.Storage.
func (*FSTree) Init() error { return nil }
func (t *FSTree) Init() error {
return util.MkdirAllX(t.RootPath, t.Permissions)
}
// Close implements common.Storage.
func (*FSTree) Close() error { return nil }

View file

@ -45,6 +45,23 @@ const (
var _ common.Storage = (*FSTree)(nil)
func New(opts ...Option) *FSTree {
f := &FSTree{
Info: Info{
Permissions: 0700,
RootPath: "./",
},
CConfig: nil,
Depth: 4,
DirNameLen: DirNameLen,
}
for i := range opts {
opts[i](f)
}
return f
}
func stringifyAddress(addr oid.Address) string {
return addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
}
@ -299,3 +316,8 @@ func (t *FSTree) NumberOfObjects() (uint64, error) {
func (*FSTree) Type() string {
return "fstree"
}
// SetCompressor implements common.Storage.
func (t *FSTree) SetCompressor(cc *compression.CConfig) {
t.CConfig = cc
}

View file

@ -0,0 +1,31 @@
package fstree
import (
"io/fs"
)
type Option func(*FSTree)
func WithDepth(d int) Option {
return func(f *FSTree) {
f.Depth = d
}
}
func WithDirNameLen(l int) Option {
return func(f *FSTree) {
f.DirNameLen = l
}
}
func WithPerm(p fs.FileMode) Option {
return func(f *FSTree) {
f.Permissions = p
}
}
func WithPath(p string) Option {
return func(f *FSTree) {
f.RootPath = p
}
}

View file

@ -23,7 +23,7 @@ func (b *BlobStor) Get(prm common.GetPrm) (common.GetRes, error) {
return common.GetRes{}, errNotFound
}
if len(prm.StorageID) == 0 {
return b.storage[1].Storage.Get(prm)
return b.storage[len(b.storage)-1].Storage.Get(prm)
}
return b.storage[0].Storage.Get(prm)
}

View file

@ -23,7 +23,7 @@ func (b *BlobStor) GetRange(prm common.GetRangePrm) (common.GetRangeRes, error)
return common.GetRangeRes{}, errNotFound
}
if len(prm.StorageID) == 0 {
return b.storage[1].Storage.GetRange(prm)
return b.storage[len(b.storage)-1].Storage.GetRange(prm)
}
return b.storage[0].Storage.GetRange(prm)
}

View file

@ -4,5 +4,10 @@ import "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree
// DumpInfo returns information about blob stor.
func (b *BlobStor) DumpInfo() fstree.Info {
return b.cfg.fsTreeInfo
for i := range b.storage {
if b.storage[i].Storage.Type() == "fstree" {
return b.storage[i].Storage.(*fstree.FSTree).Info
}
}
return fstree.Info{}
}

View file

@ -19,7 +19,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) {
for i := range b.storage {
_, err := b.storage[i].Storage.Iterate(prm)
if err != nil && !prm.IgnoreErrors {
return common.IterateRes{}, fmt.Errorf("blobovnizas iterator failure: %w", err)
return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err)
}
}
return common.IterateRes{}, nil

View file

@ -18,11 +18,8 @@ func TestIterateObjects(t *testing.T) {
// create BlobStor instance
blobStor := New(
WithStorages(defaultStorages(p, smalSz)),
WithCompressObjects(true),
WithRootPath(p),
WithSmallSizeLimit(smalSz),
WithBlobovniczaShallowWidth(1),
WithBlobovniczaShallowDepth(1),
)
defer os.RemoveAll(p)

View file

@ -33,7 +33,7 @@ func (b *BlobStor) Put(prm common.PutPrm) (common.PutRes, error) {
}
for i := range b.storage {
if b.storage[i].Policy(prm.Object, prm.RawData) {
if b.storage[i].Policy == nil || b.storage[i].Policy(prm.Object, prm.RawData) {
res, err := b.storage[i].Storage.Put(prm)
if err == nil {
storagelog.Write(b.log,