[#18] Add badgerstore substorage implementation #556

Closed
ale64bit wants to merge 1 commit from ale64bit/frostfs-node:feature/flat-blobstore-test into master
10 changed files with 607 additions and 5 deletions

View file

@ -21,6 +21,7 @@ import (
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
@ -32,6 +33,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
@ -176,6 +178,20 @@ type subStorageCfg struct {
width uint64
leafWidth uint64
openedCacheSize int
// badgerstore-specific
badger struct {
numCompactors int
numGoroutines int
numLevelZeroTables int
numLevelZeroTablesStall int
numMemtables int
memtableSize int64
maxLevels int
levelSizeMultiplier int
valueLogMaxEntries uint32
gcInterval time.Duration
}
}
// readConfig fills applicationConfiguration with raw configuration values
@ -291,6 +307,21 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
case badgerstore.Type:
sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i]))
sCfg.badger.numCompactors = sub.NumCompactors()
sCfg.badger.numGoroutines = sub.NumGoroutines()
sCfg.badger.numLevelZeroTables = sub.NumLevelZeroTables()
sCfg.badger.numLevelZeroTablesStall = sub.NumLevelZeroTablesStall()
sCfg.badger.numMemtables = sub.NumMemtables()
sCfg.badger.memtableSize = sub.MemtableSize()
sCfg.badger.maxLevels = sub.MaxLevels()
sCfg.badger.levelSizeMultiplier = sub.LevelSizeMultiplier()
sCfg.badger.valueLogMaxEntries = sub.ValueLogMaxEntries()
sCfg.badger.gcInterval = sub.GCInterval()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
@ -797,6 +828,27 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case badgerstore.Type:
ss = append(ss, blobstor.SubStorage{
Storage: badgerstore.New(
badgerstore.WithPath(sRead.path),
badgerstore.WithPermissions(sRead.perm),
badgerstore.WithLogger(c.log),
badgerstore.WithNumCompactors(sRead.badger.numCompactors),
badgerstore.WithNumGoroutines(sRead.badger.numGoroutines),
badgerstore.WithNumLevelZeroTables(sRead.badger.numLevelZeroTables),
badgerstore.WithNumLevelZeroTablesStall(sRead.badger.numLevelZeroTablesStall),
badgerstore.WithNumMemTables(sRead.badger.numMemtables),
badgerstore.WithMemtableSize(sRead.badger.memtableSize),
badgerstore.WithMaxLevels(sRead.badger.maxLevels),
badgerstore.WithLevelSizeMultiplier(sRead.badger.levelSizeMultiplier),
badgerstore.WithValueLogMaxEntries(sRead.badger.valueLogMaxEntries),
badgerstore.WithGCInterval(sRead.badger.gcInterval),
),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),

View file

@ -0,0 +1,103 @@
package badgerstoreconfig
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"github.com/dgraph-io/badger/v4"
)
// Config is a wrapper over the config section
// which provides access to badger configuration.
// For information about specific parameters, see https://pkg.go.dev/github.com/dgraph-io/badger/v4.
type Config config.Config
var defaultOpts = badger.DefaultOptions("")
const (
DefaultGCInterval = 10 * time.Second
)
func From(c *config.Config) *Config { return (*Config)(c) }
func (x *Config) Type() string { return badgerstore.Type }
func (x *Config) NumCompactors() int {
s := int(config.IntSafe((*config.Config)(x), "num_compactors"))
if s > 0 {
return s
}
return defaultOpts.NumCompactors
}
func (x *Config) NumGoroutines() int {
s := int(config.IntSafe((*config.Config)(x), "num_goroutines"))
if s > 0 {
return s
}
return defaultOpts.NumGoroutines
}
func (x *Config) NumLevelZeroTables() int {
s := int(config.IntSafe((*config.Config)(x), "num_level_zero_tables"))
if s > 0 {
return s
}
return defaultOpts.NumLevelZeroTables
}
func (x *Config) NumLevelZeroTablesStall() int {
s := int(config.IntSafe((*config.Config)(x), "num_level_zero_tables_stall"))
if s > 0 {
return s
}
return defaultOpts.NumLevelZeroTablesStall
}
func (x *Config) NumMemtables() int {
s := int(config.IntSafe((*config.Config)(x), "num_memtables"))
if s > 0 {
return s
}
return defaultOpts.NumMemtables
}
func (x *Config) MemtableSize() int64 {
s := config.IntSafe((*config.Config)(x), "memtable_size")
if s > 0 {
return s
}
return defaultOpts.MemTableSize
}
func (x *Config) MaxLevels() int {
s := int(config.IntSafe((*config.Config)(x), "max_levels"))
if s > 0 {
return s
}
return defaultOpts.MaxLevels
}
func (x *Config) LevelSizeMultiplier() int {
s := int(config.IntSafe((*config.Config)(x), "level_size_multiplier"))
if s > 0 {
return s
}
return defaultOpts.LevelSizeMultiplier
}
func (x *Config) ValueLogMaxEntries() uint32 {
s := uint32(config.IntSafe((*config.Config)(x), "value_log_max_entries"))
if s > 0 {
return s
}
return defaultOpts.ValueLogMaxEntries
}
func (x *Config) GCInterval() time.Duration {
s := config.DurationSafe((*config.Config)(x), "gc_interval")
if s > 0 {
return s
}
return DefaultGCInterval
}

View file

@ -1,10 +1,14 @@
package blobstorconfig
import (
"fmt"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/storage"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
)
// Config is a wrapper over the config section
@ -23,11 +27,14 @@ func (x *Config) Storages() []*storage.Config {
typ := config.String(
(*config.Config)(x),
strconv.Itoa(i)+".type")
if typ == "" {
switch typ {
case "":
return ss
case fstree.Type, blobovniczatree.Type, badgerstore.Type:
sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i)))
ss = append(ss, sub)
default:
panic(fmt.Sprintf("invalid type: %q", typ))
}
sub := storage.From((*config.Config)(x).Sub(strconv.Itoa(i)))
ss = append(ss, sub)
}
}

View file

@ -9,6 +9,7 @@ import (
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -55,7 +56,7 @@ func validateConfig(c *config.Config) error {
}
for i := range blobstor {
switch blobstor[i].Type() {
case fstree.Type, blobovniczatree.Type:
case fstree.Type, blobovniczatree.Type, badgerstore.Type:
default:
return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum)
}

View file

@ -0,0 +1,201 @@
package badgerstore
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"github.com/dgraph-io/badger/v4"
)
const (
Type = "badgerstore"
)
type badgerstoreImpl struct {
*cfg
db *badger.DB
closeCh chan struct{}
}
func New(opts ...Option) common.Storage {
st := &badgerstoreImpl{
cfg: defaultConfig(),
}
for _, opt := range opts {
opt(st.cfg)
}
st.badgerOptions = st.badgerOptions.WithDir(st.cfg.path)
st.badgerOptions = st.badgerOptions.WithValueDir(st.cfg.path)
return st
}
func (s *badgerstoreImpl) Get(_ context.Context, req common.GetPrm) (common.GetRes, error) {
addrKey := addressKey(req.Address)
var data []byte
if err := s.db.View(func(tx *badger.Txn) error {
it, err := tx.Get(addrKey[:])
if err != nil {
return err
}
data, err = it.ValueCopy(nil)
return err
}); err != nil {
if err == badger.ErrKeyNotFound {
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.GetRes{}, err
}
var err error
if data, err = s.compression.Decompress(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
return common.GetRes{Object: obj, RawData: data}, nil
}
func (s *badgerstoreImpl) GetRange(ctx context.Context, req common.GetRangePrm) (common.GetRangeRes, error) {
getResp, err := s.Get(ctx, common.GetPrm{
Address: req.Address,
StorageID: req.StorageID,
})
if err != nil {
return common.GetRangeRes{}, err
}
payload := getResp.Object.Payload()
from := req.Range.GetOffset()
to := from + req.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
return common.GetRangeRes{
Data: payload[from:to],
}, nil
}
func (s *badgerstoreImpl) Exists(_ context.Context, req common.ExistsPrm) (common.ExistsRes, error) {
addrKey := addressKey(req.Address)
exists := true
if err := s.db.View(func(tx *badger.Txn) error {
_, err := tx.Get(addrKey[:])
if err == badger.ErrKeyNotFound {
exists = false
return nil
}
return err
}); err != nil {
return common.ExistsRes{}, err
}
return common.ExistsRes{Exists: exists}, nil
}
func (s *badgerstoreImpl) Put(_ context.Context, req common.PutPrm) (common.PutRes, error) {
if s.badgerOptions.ReadOnly {
return common.PutRes{}, common.ErrReadOnly
}
if !req.DontCompress {
req.RawData = s.compression.Compress(req.RawData)
}
addrKey := addressKey(req.Address)
wb := s.db.NewWriteBatch()
defer wb.Cancel()
_ = wb.Set(addrKey[:], req.RawData)
return common.PutRes{}, wb.Flush()
}
func (s *badgerstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.DeleteRes, error) {
if s.badgerOptions.ReadOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
addrKey := addressKey(req.Address)
err := s.db.Update(func(tx *badger.Txn) error {
if _, err := tx.Get(addrKey[:]); err != nil {

Please explain why Get first and only then Delete?

Please explain why Get first and only then Delete?

Our Delete must return apistatus.ObjectNotFound. But badger doesn't return badger.ErrKeyNotFound when deleting, so I call Get first to check that.

Our `Delete` must return `apistatus.ObjectNotFound`. But badger doesn't return `badger.ErrKeyNotFound` when deleting, so I call `Get` first to check that.
return err
}
return tx.Delete(addrKey[:])
})
if err == badger.ErrKeyNotFound {
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.DeleteRes{}, err
}
func (s *badgerstoreImpl) iterateKeyValue(req common.IteratePrm, k, v []byte) error {
elem := common.IterationElement{
ObjectData: v,
}
if err := decodeAddress(k, &elem.Address); err != nil {
if req.IgnoreErrors {
return nil
}
return logicerr.Wrap(fmt.Errorf("(%T) decoding address string %q: %v", s, string(k), err))
}
var err error
if elem.ObjectData, err = s.compression.Decompress(elem.ObjectData); err != nil {
if req.IgnoreErrors {
if req.ErrorHandler != nil {
return req.ErrorHandler(elem.Address, err)
}
return nil
}
return logicerr.Wrap(fmt.Errorf("(%T) decompressing data for address %q: %v", s, elem.Address.String(), err))
}
switch {
case req.Handler != nil:
if err := req.Handler(elem); err != nil {
return err
}
case req.LazyHandler != nil:
if err := req.LazyHandler(elem.Address, func() ([]byte, error) { return elem.ObjectData, nil }); err != nil {
return err
}
default:
if !req.IgnoreErrors {
return logicerr.Wrap(fmt.Errorf("(%T) no Handler or LazyHandler set for IteratePrm", s))
}
}
return nil
}
func (s *badgerstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) {
err := s.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
if err := it.Item().Value(func(val []byte) error {
return s.iterateKeyValue(req, it.Item().Key(), val)
}); err != nil {
return err
}
}
return nil
})
return common.IterateRes{}, err
}

View file

@ -0,0 +1,23 @@
package badgerstore
import (
"path/filepath"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"go.uber.org/zap/zaptest"
)
func TestGeneric(t *testing.T) {
ctor := func(t *testing.T) common.Storage {
return New(
WithPath(filepath.Join(t.TempDir(), "badgerstore")),
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithGCInterval(10),
)
}
blobstortest.TestAll(t, ctor, 2048, 16*1024)
}

View file

@ -0,0 +1,49 @@
package badgerstore
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"github.com/dgraph-io/badger/v4"
)
func (s *badgerstoreImpl) Open(readOnly bool) (err error) {
s.badgerOptions = s.badgerOptions.WithReadOnly(readOnly)
s.db, err = badger.Open(s.badgerOptions)
return
}
func (s *badgerstoreImpl) gc() {
t := time.NewTicker(s.gcInterval)
defer t.Stop()
for {
select {
case <-s.closeCh:
return
case <-t.C:
for {
if err := s.db.RunValueLogGC(0.5); err != nil {
break
}
}
}
}
}
func (s *badgerstoreImpl) Init() error {
s.closeCh = make(chan struct{})
go s.gc()
return nil
}
func (s *badgerstoreImpl) Close() error {
close(s.closeCh)
return s.db.Close()
}
func (s *badgerstoreImpl) Type() string { return Type }
func (s *badgerstoreImpl) Path() string { return s.path }
func (s *badgerstoreImpl) SetCompressor(cc *compression.Config) { s.compression = cc }
func (s *badgerstoreImpl) Compressor() *compression.Config { return s.compression }
func (s *badgerstoreImpl) SetReportErrorFunc(f func(string, error)) { s.reportError = f }
func (s *badgerstoreImpl) SetParentID(string) {}

View file

@ -0,0 +1,31 @@
package badgerstore
import (
"fmt"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type internalKey [len(cid.ID{}) + len(oid.ID{})]byte
func decodeAddress(k []byte, addr *oid.Address) error {
if got, want := len(k), len(internalKey{}); got != want {
return fmt.Errorf("unexpected internal key len: got %d, want %d", got, want)
}
var cnr cid.ID
var obj oid.ID
copy(cnr[:], k[:len(cnr)])
copy(obj[:], k[len(cnr):])
addr.SetContainer(cnr)
addr.SetObject(obj)
return nil
}
func addressKey(addr oid.Address) internalKey {
var key internalKey
cnr, obj := addr.Container(), addr.Object()
copy(key[:len(cnr)], cnr[:])
copy(key[len(cnr):], obj[:])
return key
}

View file

@ -0,0 +1,18 @@
package badgerstore
import (
"testing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestInternalKey(t *testing.T) {
wantAddr := oidtest.Address()
k := addressKey(wantAddr)
var gotAddr oid.Address
require.NoError(t, decodeAddress(k[:], &gotAddr))
require.True(t, gotAddr.Equals(wantAddr))
}

View file

@ -0,0 +1,117 @@
package badgerstore
import (
"io/fs"
"os"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/dgraph-io/badger/v4"
"go.uber.org/zap"
)
type cfg struct {
path string
perm fs.FileMode
log *logger.Logger
badgerOptions badger.Options
compression *compression.Config
reportError func(string, error)
gcInterval time.Duration
}
func defaultConfig() *cfg {
return &cfg{
perm: os.ModePerm, // 0777
log: &logger.Logger{Logger: zap.L()},
reportError: func(string, error) {},
badgerOptions: badger.DefaultOptions(""),
}
}
type Option func(*cfg)
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithPath(p string) Option {
return func(c *cfg) {
c.path = p
}
}
func WithReadOnly(ro bool) Option {
return func(c *cfg) {
c.badgerOptions = c.badgerOptions.WithReadOnly(ro)
}
}
func WithPermissions(perm fs.FileMode) Option {
return func(c *cfg) {
c.perm = perm
}
}
func WithNumCompactors(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumCompactors = v
}
}
func WithNumGoroutines(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumGoroutines = v
}
}
func WithNumLevelZeroTables(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumLevelZeroTables = v
}
}
func WithNumLevelZeroTablesStall(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumLevelZeroTablesStall = v
}
}
func WithNumMemTables(v int) Option {
return func(c *cfg) {
c.badgerOptions.NumMemtables = v
}
}
func WithMemtableSize(v int64) Option {
return func(c *cfg) {
c.badgerOptions.MemTableSize = v
}
}
func WithMaxLevels(v int) Option {
return func(c *cfg) {
c.badgerOptions.MaxLevels = v
}
}
func WithLevelSizeMultiplier(v int) Option {
return func(c *cfg) {
c.badgerOptions.LevelSizeMultiplier = v
}
}
func WithValueLogMaxEntries(v uint32) Option {
return func(c *cfg) {
c.badgerOptions.ValueLogMaxEntries = v
}
}
func WithGCInterval(v time.Duration) Option {
return func(c *cfg) {
c.gcInterval = v
}
}