[#18] Add badgerstore substorage implementation #556

Closed
ale64bit wants to merge 1 commit from ale64bit/frostfs-node:feature/flat-blobstore-test into master
10 changed files with 607 additions and 5 deletions
Showing only changes of commit 440cf85d62 - Show all commits

View file

@ -21,6 +21,7 @@ import (
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
@ -32,6 +33,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -176,6 +178,20 @@ type subStorageCfg struct {
width uint64
leafWidth uint64
openedCacheSize int
// badgerstore-specific
badger struct {
numCompactors int
numGoroutines int
numLevelZeroTables int
numLevelZeroTablesStall int
numMemtables int
memtableSize int64
maxLevels int
levelSizeMultiplier int
valueLogMaxEntries uint32
gcInterval time.Duration
}
}
// readConfig fills applicationConfiguration with raw configuration values
@ -291,6 +307,21 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
case badgerstore.Type:
sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i]))
sCfg.badger.numCompactors = sub.NumCompactors()
sCfg.badger.numGoroutines = sub.NumGoroutines()
sCfg.badger.numLevelZeroTables = sub.NumLevelZeroTables()
sCfg.badger.numLevelZeroTablesStall = sub.NumLevelZeroTablesStall()
sCfg.badger.numMemtables = sub.NumMemtables()
sCfg.badger.memtableSize = sub.MemtableSize()
sCfg.badger.maxLevels = sub.MaxLevels()
sCfg.badger.levelSizeMultiplier = sub.LevelSizeMultiplier()
sCfg.badger.valueLogMaxEntries = sub.ValueLogMaxEntries()
sCfg.badger.gcInterval = sub.GCInterval()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
@ -797,6 +828,27 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case badgerstore.Type:
ss = append(ss, blobstor.SubStorage{
Storage: badgerstore.New(
badgerstore.WithPath(sRead.path),
badgerstore.WithPermissions(sRead.perm),
badgerstore.WithLogger(c.log),
badgerstore.WithNumCompactors(sRead.badger.numCompactors),
badgerstore.WithNumGoroutines(sRead.badger.numGoroutines),
badgerstore.WithNumLevelZeroTables(sRead.badger.numLevelZeroTables),
badgerstore.WithNumLevelZeroTablesStall(sRead.badger.numLevelZeroTablesStall),
badgerstore.WithNumMemTables(sRead.badger.numMemtables),
badgerstore.WithMemtableSize(sRead.badger.memtableSize),
badgerstore.WithMaxLevels(sRead.badger.maxLevels),
badgerstore.WithLevelSizeMultiplier(sRead.badger.levelSizeMultiplier),
badgerstore.WithValueLogMaxEntries(sRead.badger.valueLogMaxEntries),
badgerstore.WithGCInterval(sRead.badger.gcInterval),
),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),

View file

@ -0,0 +1,103 @@
package badgerstoreconfig
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"github.com/dgraph-io/badger/v4"
)
// Config is a wrapper over the config section
// which provides access to badger configuration.
// For information about specific parameters, see https://pkg.go.dev/github.com/dgraph-io/badger/v4.
type Config config.Config
var defaultOpts = badger.DefaultOptions("")
const (
DefaultGCInterval = 10 * time.Second
)
func From(c *config.Config) *Config { return (*Config)(c) }
func (x *Config) Type() string { return badgerstore.Type }
func (x *Config) NumCompactors() int {
s := int(config.IntSafe((*config.Config)(x), "num_compactors"))
if s > 0 {
return s
}
return defaultOpts.NumCompactors
}
func (x *Config) NumGoroutines() int {
s := int(config.IntSafe((*config.Config)(x), "num_goroutines"))
if s > 0 {
return s
}
return defaultOpts.NumGoroutines
}
func (x *Config) NumLevelZeroTables() int {
s := int(config.IntSafe((*config.Config)(x), "num_level_zero_tables"))
if s > 0 {
return s
}
return defaultOpts.NumLevelZeroTables
}
func (x *Config) NumLevelZeroTablesStall() int {
s := int(config.IntSafe((*config.Config)(x), "num_level_zero_tables_stall"))
if s > 0 {
return s
}
return defaultOpts.NumLevelZeroTablesStall
}
func (x *Config) NumMemtables() int {
s := int(config.IntSafe((*config.Config)(x), "num_memtables"))
if s > 0 {
return s
}
return defaultOpts.NumMemtables
}
func (x *Config) MemtableSize() int64 {
s := config.IntSafe((*config.Config)(x), "memtable_size")
if s > 0 {
return s
}
return defaultOpts.MemTableSize
}
func (x *Config) MaxLevels() int {
s := int(config.IntSafe((*config.Config)(x), "max_levels"))
if s > 0 {
return s
}
return defaultOpts.MaxLevels
}
func (x *Config) LevelSizeMultiplier() int {
s := int(config.IntSafe((*config.Config)(x), "level_size_multiplier"))
if s > 0 {
return s
}
return defaultOpts.LevelSizeMultiplier
}
func (x *Config) ValueLogMaxEntries() uint32 {
s := uint32(config.IntSafe((*config.Config)(x), "value_log_max_entries"))
if s > 0 {
return s
}
return defaultOpts.ValueLogMaxEntries
}
func (x *Config) GCInterval() time.Duration {
s := config.DurationSafe((*config.Config)(x), "gc_interval")
if s > 0 {
return s
}
return DefaultGCInterval
}

View file

@ -1,10 +1,14 @@
package blobstorconfig
import (
"fmt"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/storage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
)
// Config is a wrapper over the config section
@ -23,11 +27,14 @@ func (x *Config) Storages() []*storage.Config {
typ := config.String(
(*config.Config)(x),
strconv.Itoa(i)+".type")
if typ == "" {
switch typ {
case "":
return ss
case fstree.Type, blobovniczatree.Type, badgerstore.Type:
sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i)))
ss = append(ss, sub)
default:
panic(fmt.Sprintf("invalid type: %q", typ))
}
sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i)))
ss = append(ss, sub)
}
}

View file

@ -9,6 +9,7 @@ import (
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -55,7 +56,7 @@ func validateConfig(c *config.Config) error {
}
for i := range blobstor {
switch blobstor[i].Type() {
case fstree.Type, blobovniczatree.Type:
case fstree.Type, blobovniczatree.Type, badgerstore.Type:
default:
return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum)
}

View file

@ -0,0 +1,201 @@
package badgerstore
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/dgraph-io/badger/v4"
)
const (
Type = "badgerstore"
)
type badgerstoreImpl struct {
*cfg
db *badger.DB
closeCh chan struct{}
}
func New(opts ...Option) common.Storage {
st := &badgerstoreImpl{
cfg: defaultConfig(),
}
for _, opt := range opts {
opt(st.cfg)
}
st.badgerOptions = st.badgerOptions.WithDir(st.cfg.path)
st.badgerOptions = st.badgerOptions.WithValueDir(st.cfg.path)
return st
}
func (s *badgerstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) {
addrKey := addressKey(req.Address)
var data []byte
if err := s.db.View(func(tx *badger.Txn) error {
it, err := tx.Get(addrKey[:])
if err != nil {
return err
}
data, err = it.ValueCopy(nil)
return err
}); err != nil {
if err == badger.ErrKeyNotFound {
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.GetRes{}, err
}
var err error
if data, err = s.compression.Decompress(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *badgerstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(ctx, common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
if err != nil {
return common.GetRangeRes{}, err
}
payload := getResp.Object.Payload()
from := req.Range.GetOffset()
to := from + req.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}
func (s *badgerstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
addrKey := addressKey(req.Address)
exists := true
if err := s.db.View(func(tx *badger.Txn) error {
_, err := tx.Get(addrKey[:])
if err == badger.ErrKeyNotFound {
exists = false
return nil
}
return err
}); err != nil {
return common.ExistsRes{}, err
}
return common.ExistsRes{Exists: exists}, nil
}
func (s *badgerstoreImpl) Put(_ context.Context, req common.PutPrm) (common.PutRes, error) {
if s.badgerOptions.ReadOnly {
return common.PutRes{}, common.ErrReadOnly
}
if !req.DontCompress {
req.RawData = s.compression.Compress(req.RawData)
}
addrKey := addressKey(req.Address)
wb := s.db.NewWriteBatch()
defer wb.Cancel()
_ = wb.Set(addrKey[:], req.RawData)
return common.PutRes{}, wb.Flush()
}
func (s *badgerstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.DeleteRes, error) {
if s.badgerOptions.ReadOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
addrKey := addressKey(req.Address)
err := s.db.Update(func(tx *badger.Txn) error {
if _, err := tx.Get(addrKey[:]); err != nil {
return err
}
return tx.Delete(addrKey[:])
})
if err == badger.ErrKeyNotFound {
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.DeleteRes{}, err
}
func (s *badgerstoreImpl) iterateKeyValue(req common.IteratePrm, k, v []byte) error {
elem := common.IterationElement{
ObjectData: v,
}
if err := decodeAddress(k, &elem.Address); err != nil {
if req.IgnoreErrors {
return nil
}
return logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
}
var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
if req.IgnoreErrors {
if req.ErrorHandler != nil {
return req.ErrorHandler(elem.Address, err)
}
return nil
}
return logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
}
switch {
case req.Handler != nil:
if err := req.Handler(elem); err != nil {
return err
}
case req.LazyHandler != nil:
if err := req.LazyHandler(elem.Address, func() ([]byte, error) { return elem.ObjectData, nil }); err != nil {
return err
}
default:
if !req.IgnoreErrors {
return logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
}
}
return nil
}
func (s *badgerstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
err := s.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
if err := it.Item().Value(func(val []byte) error {
return s.iterateKeyValue(req, it.Item().Key(), val)
}); err != nil {
return err
}
}
return nil
})
return common.IterateRes{}, err
}

View file

@ -0,0 +1,23 @@
package badgerstore
import (
"path/filepath"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap/zaptest"
)
func TestGeneric(t *testing.T) {
ctor := func(t *testing.T) common.Storage {
return New(
WithPath(filepath.Join(t.TempDir(), "badgerstore")),
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithGCInterval(10),
)
}
blobstortest.TestAll(t, ctor, 2048, 16*1024)
}

View file

@ -0,0 +1,49 @@
package badgerstore
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"github.com/dgraph-io/badger/v4"
)
func (s *badgerstoreImpl) Open(readOnly bool) (err error) {
s.badgerOptions = s.badgerOptions.WithReadOnly(readOnly)
s.db, err = badger.Open(s.badgerOptions)
return
}
func (s *badgerstoreImpl) gc() {
t := time.NewTicker(s.gcInterval)
defer t.Stop()
for {
select {
case <-s.closeCh:
return
case <-t.C:
for {
if err := s.db.RunValueLogGC(0.5); err != nil {
break
}
}
}
}
}
func (s *badgerstoreImpl) Init() error {
s.closeCh = make(chan struct{})
go s.gc()
return nil
}
func (s *badgerstoreImpl) Close() error {
close(s.closeCh)
return s.db.Close()
}
func (s *badgerstoreImpl) Type() string { return Type }
func (s *badgerstoreImpl) Path() string { return s.path }
func (s *badgerstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *badgerstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *badgerstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
func (s *badgerstoreImpl) SetParentID(string) {}

View file

@ -0,0 +1,31 @@
package badgerstore
import (
"fmt"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
func decodeAddress(k []byte, addr *oid.Address) error {
if got, want := len(k), len(internalKey{}); got != want {
return fmt.Errorf("unexpected internal key len: got %d, want %d", got, want)
}
var cnr cid.ID
var obj oid.ID
copy(cnr[:], k[:len(cnr)])
copy(obj[:], k[len(cnr):])
addr.SetContainer(cnr)
addr.SetObject(obj)
return nil
}
func addressKey(addr oid.Address) internalKey {
var key internalKey
cnr, obj := addr.Container(), addr.Object()
copy(key[:len(cnr)], cnr[:])
copy(key[len(cnr):], obj[:])
return key
}

View file

@ -0,0 +1,18 @@
package badgerstore
import (
"testing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestInternalKey(t *testing.T) {
wantAddr := oidtest.Address()
k := addressKey(wantAddr)
var gotAddr oid.Address
require.NoError(t, decodeAddress(k[:], &gotAddr))
require.True(t, gotAddr.Equals(wantAddr))
}

View file

@ -0,0 +1,117 @@
package badgerstore
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/dgraph-io/badger/v4"
"go.uber.org/zap"
)
type cfg struct {
path string
perm fs.FileMode
log *logger.Logger
badgerOptions badger.Options
compression *compression.Config
reportError func(string, error)
gcInterval time.Duration
}
func defaultConfig() *cfg {
return &cfg{
perm: os.ModePerm, // 0777
log: &logger.Logger{Logger: zap.L()},
reportError: func(string, error) {},
badgerOptions: badger.DefaultOptions(""),
}
}
type Option func(*cfg)
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithPath(p string) Option {
return func(c *cfg) {
c.path = p
}
}
func WithReadOnly(ro bool) Option {
return func(c *cfg) {
c.badgerOptions = c.badgerOptions.WithReadOnly(ro)
}
}
func WithPermissions(perm fs.FileMode) Option {
return func(c *cfg) {
c.perm = perm
}
}
func WithNumCompactors(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumCompactors = v
}
}
func WithNumGoroutines(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumGoroutines = v
}
}
func WithNumLevelZeroTables(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumLevelZeroTables = v
}
}
func WithNumLevelZeroTablesStall(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumLevelZeroTablesStall = v
}
}
func WithNumMemTables(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumMemtables = v
}
}
func WithMemtableSize(v int64) Option {
return func(c *cfg) {
c.badgerOptions.MemTableSize = v
}
}
func WithMaxLevels(v int) Option {
return func(c *cfg) {
c.badgerOptions.MaxLevels = v
}
}
func WithLevelSizeMultiplier(v int) Option {
return func(c *cfg) {
c.badgerOptions.LevelSizeMultiplier = v
}
}
func WithValueLogMaxEntries(v uint32) Option {
return func(c *cfg) {
c.badgerOptions.ValueLogMaxEntries = v
}
}
func WithGCInterval(v time.Duration) Option {
return func(c *cfg) {
c.gcInterval = v
}
}