[#645] blobstor: Add Badger store
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
5b8200de88
commit
e9aed22454
14 changed files with 830 additions and 7 deletions
|
@ -21,6 +21,7 @@ import (
|
|||
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
|
||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore"
|
||||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
|
@ -34,6 +35,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
|
@ -189,6 +191,14 @@ type subStorageCfg struct {
|
|||
openedCacheSize int
|
||||
initWorkerCount int
|
||||
initInAdvance bool
|
||||
|
||||
// badgerstore-specific
|
||||
indexCacheSize int64
|
||||
memTablesCount int
|
||||
compactorsCount int
|
||||
gcInterval time.Duration
|
||||
gcDiscardRatio float64
|
||||
valueLogFileSize int64
|
||||
rebuildDropTimeout time.Duration
|
||||
}
|
||||
|
||||
|
@ -317,6 +327,14 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
|
|||
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.depth = sub.Depth()
|
||||
sCfg.noSync = sub.NoSync()
|
||||
case badgerstore.Type:
|
||||
sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.indexCacheSize = sub.IndexCacheSize()
|
||||
sCfg.memTablesCount = sub.MemTablesCount()
|
||||
sCfg.compactorsCount = sub.CompactorsCount()
|
||||
sCfg.gcInterval = sub.GCInterval()
|
||||
sCfg.gcDiscardRatio = sub.GCDiscardRatio()
|
||||
sCfg.valueLogFileSize = sub.ValueLogFileSize()
|
||||
default:
|
||||
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
|
||||
}
|
||||
|
@ -941,6 +959,23 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
return true
|
||||
},
|
||||
})
|
||||
case badgerstore.Type:
|
||||
badgerStoreOpts := []badgerstore.Option{
|
||||
badgerstore.WithPath(sRead.path),
|
||||
badgerstore.WithPermissions(sRead.perm),
|
||||
badgerstore.WithCompactorsCount(sRead.compactorsCount),
|
||||
badgerstore.WithGCDiscardRatio(sRead.gcDiscardRatio),
|
||||
badgerstore.WithGCInterval(sRead.gcInterval),
|
||||
badgerstore.WithIndexCacheSize(sRead.indexCacheSize),
|
||||
badgerstore.WithMemTablesCount(sRead.memTablesCount),
|
||||
badgerstore.WithValueLogSize(sRead.valueLogFileSize),
|
||||
}
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: badgerstore.New(badgerStoreOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
default:
|
||||
// should never happen, that has already
|
||||
// been handled: when the config was read
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
)
|
||||
|
||||
type Config config.Config
|
||||
|
||||
const (
|
||||
IndexCacheSizeDefault = 256 << 20 // 256MB
|
||||
MemTablesCountDefault = 32
|
||||
CompactorsCountDefault = 64
|
||||
GCIntervalDefault = 10 * time.Minute
|
||||
GCDiscardRatioDefault = 0.2
|
||||
ValueLogSizeDefault = 1 << 30 // 1GB
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
func From(c *config.Config) *Config {
|
||||
return (*Config)(c)
|
||||
}
|
||||
|
||||
// Type returns the storage type.
|
||||
func (x *Config) Type() string {
|
||||
return badgerstore.Type
|
||||
}
|
||||
|
||||
// IndexCacheSize returns `index_cache_size` value or IndexCacheSizeDefault.
|
||||
func (x *Config) IndexCacheSize() int64 {
|
||||
s := config.SizeInBytesSafe((*config.Config)(x), "index_cache_size")
|
||||
if s > 0 {
|
||||
return int64(s)
|
||||
}
|
||||
|
||||
return IndexCacheSizeDefault
|
||||
}
|
||||
|
||||
// MemTablesCount returns `mem_tables_count` value or MemTablesCountDefault.
|
||||
func (x *Config) MemTablesCount() int {
|
||||
v := config.IntSafe((*config.Config)(x), "mem_tables_count")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return MemTablesCountDefault
|
||||
}
|
||||
|
||||
// CompactorsCount returns `compactors_count` value or CompactorsCountDefault.
|
||||
func (x *Config) CompactorsCount() int {
|
||||
v := config.IntSafe((*config.Config)(x), "compactors_count")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return CompactorsCountDefault
|
||||
}
|
||||
|
||||
// GCInterval returns `gc_interval` value or GCIntervalDefault.
|
||||
func (x *Config) GCInterval() time.Duration {
|
||||
v := config.DurationSafe((*config.Config)(x), "gc_interval")
|
||||
if v > 0 {
|
||||
return v
|
||||
}
|
||||
return GCIntervalDefault
|
||||
}
|
||||
|
||||
// GCDiscardRatio returns `gc_discard_percent` value as ratio value (in range (0.0; 1.0)) or GCDiscardRatioDefault.
|
||||
func (x *Config) GCDiscardRatio() float64 {
|
||||
v := config.Uint32Safe((*config.Config)(x), "gc_discard_percent")
|
||||
if v > 0 && v < 100 {
|
||||
return float64(v) / (float64(100))
|
||||
}
|
||||
return GCDiscardRatioDefault
|
||||
}
|
||||
|
||||
// ValueLogFileSize returns `value_log_file_size` value or ValueLogSizeDefault.
|
||||
func (x *Config) ValueLogFileSize() int64 {
|
||||
s := config.SizeInBytesSafe((*config.Config)(x), "value_log_file_size")
|
||||
if s > 0 {
|
||||
return int64(s)
|
||||
}
|
||||
|
||||
return ValueLogSizeDefault
|
||||
}
|
|
@ -9,6 +9,7 @@ import (
|
|||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
|
@ -60,7 +61,7 @@ func validateConfig(c *config.Config) error {
|
|||
}
|
||||
for i := range blobstor {
|
||||
switch blobstor[i].Type() {
|
||||
case fstree.Type, blobovniczatree.Type:
|
||||
case fstree.Type, blobovniczatree.Type, badgerstore.Type:
|
||||
default:
|
||||
return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum)
|
||||
}
|
||||
|
|
117
pkg/local_object_storage/blobstor/badgerstore/config.go
Normal file
117
pkg/local_object_storage/blobstor/badgerstore/config.go
Normal file
|
@ -0,0 +1,117 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/dgraph-io/badger/v4/options"
|
||||
)
|
||||
|
||||
type cfg struct {
|
||||
permissions fs.FileMode
|
||||
compression *compression.Config
|
||||
db badger.Options
|
||||
gcTimeout time.Duration
|
||||
gcDiscardRatio float64
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
// defaultCfg creates default options to create Store.
|
||||
// Default Badger options:
|
||||
// BaseTableSize: 2MB
|
||||
// BaseLevelSize: 10MB
|
||||
// TableSizeMultiplier: 2
|
||||
// LevelSizeMultiplier: 10
|
||||
// MaxLevels: 7
|
||||
// NumLevelZeroTables: 5
|
||||
// ValueLogFileSize: 1GB
|
||||
//
|
||||
// Badger flushes MemTable directly to Level0.
|
||||
// So for Level0 MemTableSize is used as TableSize https://github.com/dgraph-io/badger/blob/v4.1.0/levels.go#L403.
|
||||
// There is no total size limit for Level0, only NumLevelZeroTables
|
||||
//
|
||||
// Badger uses Dynamic Level Sizes like RocksDB.
|
||||
// See https://github.com/facebook/rocksdb/blob/v3.11/include/rocksdb/options.h#L366 for explanation.
|
||||
func defaultCfg() *cfg {
|
||||
opts := badger.DefaultOptions("/")
|
||||
opts.BlockCacheSize = 0 // compression and encryption are disabled, so block cache should be disabled
|
||||
opts.IndexCacheSize = 256 << 20 // 256MB, to not to keep all indicies in memory
|
||||
opts.Compression = options.None // performed by cfg.compressor
|
||||
opts.Logger = nil
|
||||
opts.MetricsEnabled = false
|
||||
opts.NumLevelZeroTablesStall = math.MaxInt // to not to stall because of Level0 slow compaction
|
||||
opts.NumMemtables = 32 // default memtable size is 64MB, so max memory consumption will be 2GB before stall
|
||||
opts.NumCompactors = 64
|
||||
opts.SyncWrites = true
|
||||
opts.ValueLogMaxEntries = math.MaxUint32 // default vLog file size is 1GB, so size is more clear than entries count
|
||||
opts.ValueThreshold = 0 // to store all values in vLog
|
||||
opts.LmaxCompaction = true
|
||||
|
||||
return &cfg{
|
||||
permissions: 0o700,
|
||||
db: opts,
|
||||
gcTimeout: 10 * time.Minute,
|
||||
gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free
|
||||
}
|
||||
}
|
||||
|
||||
// WithPath sets BadgerStore directory.
|
||||
func WithPath(dir string) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.Dir = dir
|
||||
c.db.ValueDir = dir
|
||||
}
|
||||
}
|
||||
|
||||
// WithPermissions sets persmission flags.
|
||||
func WithPermissions(p fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.permissions = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithIndexCacheSize sets BadgerStore index cache size.
|
||||
func WithIndexCacheSize(sz int64) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.IndexCacheSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithMemTablesCount sets maximum count of memtables.
|
||||
func WithMemTablesCount(count int) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.NumMemtables = count
|
||||
}
|
||||
}
|
||||
|
||||
// WithCompactorsCount sets count of concurrent compactors.
|
||||
func WithCompactorsCount(count int) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.NumCompactors = count
|
||||
}
|
||||
}
|
||||
|
||||
// WithGCInterval sets GC interval value.
|
||||
func WithGCInterval(d time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.gcTimeout = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithGCDiscardRatio sets GC discard ratio.
|
||||
func WithGCDiscardRatio(r float64) Option {
|
||||
return func(c *cfg) {
|
||||
c.gcDiscardRatio = r
|
||||
}
|
||||
}
|
||||
|
||||
// WithValueLogSize sets max value log size.
|
||||
func WithValueLogSize(sz int64) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.ValueLogFileSize = sz
|
||||
}
|
||||
}
|
93
pkg/local_object_storage/blobstor/badgerstore/control.go
Normal file
93
pkg/local_object_storage/blobstor/badgerstore/control.go
Normal file
|
@ -0,0 +1,93 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
var errStoreMustBeOpenedBeforeInit = errors.New("store must be opened before initialization")
|
||||
|
||||
// Close implements common.Storage.
|
||||
func (s *Store) Close() error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if !s.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.gcCancel != nil {
|
||||
s.gcCancel()
|
||||
}
|
||||
s.wg.Wait()
|
||||
|
||||
if err := s.db.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.opened = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init implements common.Storage.
|
||||
func (s *Store) Init() error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if !s.opened {
|
||||
return errStoreMustBeOpenedBeforeInit
|
||||
}
|
||||
|
||||
s.startGC()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) startGC() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.gcCancel = cancel
|
||||
|
||||
t := time.NewTicker(s.cfg.gcTimeout)
|
||||
s.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if err := s.db.RunValueLogGC(s.cfg.gcDiscardRatio); err == nil {
|
||||
_ = s.db.RunValueLogGC(s.cfg.gcDiscardRatio) // see https://dgraph.io/docs/badger/get-started/#garbage-collection
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Open implements common.Storage.
|
||||
func (s *Store) Open(readOnly bool) error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if s.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := util.MkdirAllX(s.cfg.db.Dir, s.cfg.permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.cfg.db.ReadOnly = readOnly
|
||||
if s.db, err = badger.Open(s.cfg.db); err != nil {
|
||||
return err
|
||||
}
|
||||
s.opened = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) readOnly() bool {
|
||||
return s.cfg.db.ReadOnly
|
||||
}
|
44
pkg/local_object_storage/blobstor/badgerstore/delete.go
Normal file
44
pkg/local_object_storage/blobstor/badgerstore/delete.go
Normal file
|
@ -0,0 +1,44 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Delete implements common.Storage.
|
||||
func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.readOnly() {
|
||||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
tx := s.db.NewTransaction(true)
|
||||
defer tx.Discard()
|
||||
|
||||
_, err := tx.Get(key(prm.Address))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
err = tx.Delete(key(prm.Address))
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
return common.DeleteRes{}, tx.Commit()
|
||||
}
|
36
pkg/local_object_storage/blobstor/badgerstore/exists.go
Normal file
36
pkg/local_object_storage/blobstor/badgerstore/exists.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Exists implements common.Storage.
|
||||
func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
tx := s.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
_, err := tx.Get(key(prm.Address))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return common.ExistsRes{Exists: false}, nil
|
||||
}
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
return common.ExistsRes{Exists: true}, nil
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
const maxObjectSize = 1 << 16
|
||||
|
||||
helper := func(t *testing.T, dir string) common.Storage {
|
||||
return New(WithPath(dir))
|
||||
}
|
||||
|
||||
newStore := func(t *testing.T) common.Storage {
|
||||
return helper(t, t.TempDir())
|
||||
}
|
||||
|
||||
blobstortest.TestAll(t, newStore, 1024, maxObjectSize)
|
||||
|
||||
t.Run("info", func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
blobstortest.TestInfo(t, func(t *testing.T) common.Storage {
|
||||
return helper(t, dir)
|
||||
}, Type, dir)
|
||||
})
|
||||
}
|
||||
|
||||
func TestControl(t *testing.T) {
|
||||
const maxObjectSize = 2048
|
||||
|
||||
newStore := func(t *testing.T) common.Storage {
|
||||
return New(WithPath(t.TempDir()))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newStore, 1024, maxObjectSize)
|
||||
}
|
107
pkg/local_object_storage/blobstor/badgerstore/get.go
Normal file
107
pkg/local_object_storage/blobstor/badgerstore/get.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Get implements common.Storage.
|
||||
func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.Bool("raw", prm.Raw),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
data, err := s.getObjectData(prm.Address)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
data, err = s.cfg.compression.Decompress(data)
|
||||
if err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
}
|
||||
|
||||
// GetRange implements common.Storage.
|
||||
func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
data, err := s.getObjectData(prm.Address)
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
|
||||
data, err = s.cfg.compression.Decompress(data)
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
from := prm.Range.GetOffset()
|
||||
to := from + prm.Range.GetLength()
|
||||
payload := obj.Payload()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
return common.GetRangeRes{
|
||||
Data: payload[from:to],
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Store) getObjectData(addr oid.Address) ([]byte, error) {
|
||||
var data []byte
|
||||
tx := s.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
item, err := tx.Get(key(addr))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err = item.ValueCopy(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
113
pkg/local_object_storage/blobstor/badgerstore/iterate.go
Normal file
113
pkg/local_object_storage/blobstor/badgerstore/iterate.go
Normal file
|
@ -0,0 +1,113 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Iterate implements common.Storage.
|
||||
func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var last []byte
|
||||
opts := badger.DefaultIteratorOptions
|
||||
batch := make([]keyValue, 0, opts.PrefetchSize)
|
||||
opts.PrefetchSize++ // to skip last
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.IterateRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
batch = batch[:0]
|
||||
err := s.db.View(func(tx *badger.Txn) error {
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Seek(last); it.Valid(); it.Next() {
|
||||
if bytes.Equal(last, it.Item().Key()) {
|
||||
continue
|
||||
}
|
||||
|
||||
var kv keyValue
|
||||
var err error
|
||||
kv.key = it.Item().KeyCopy(nil)
|
||||
kv.value, err = it.Item().ValueCopy(nil)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
batch = append(batch, kv)
|
||||
last = kv.key
|
||||
if len(batch) == opts.PrefetchSize-1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.IterateRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
if err := s.iterateBatch(batch, prm); err != nil {
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return common.IterateRes{}, nil
|
||||
}
|
||||
|
||||
func (s *Store) iterateBatch(batch []keyValue, prm common.IteratePrm) error {
|
||||
for _, kv := range batch {
|
||||
addr, err := address(kv.key)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
}
|
||||
data, err := s.cfg.compression.Decompress(kv.value)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
if err := prm.Handler(common.IterationElement{
|
||||
Address: addr,
|
||||
ObjectData: data,
|
||||
StorageID: defaultStorageID,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type keyValue struct {
|
||||
key, value []byte
|
||||
}
|
33
pkg/local_object_storage/blobstor/badgerstore/keys.go
Normal file
33
pkg/local_object_storage/blobstor/badgerstore/keys.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
const (
|
||||
keyLength = 64
|
||||
objectIDOffset = 32
|
||||
)
|
||||
|
||||
func key(add oid.Address) []byte {
|
||||
res := make([]byte, keyLength)
|
||||
add.Container().Encode(res)
|
||||
add.Object().Encode(res[objectIDOffset:])
|
||||
return res
|
||||
}
|
||||
|
||||
func address(k []byte) (oid.Address, error) {
|
||||
var res oid.Address
|
||||
var containerID cid.ID
|
||||
var objectID oid.ID
|
||||
if err := containerID.Decode(k[:objectIDOffset]); err != nil {
|
||||
return res, err
|
||||
}
|
||||
if err := objectID.Decode(k[objectIDOffset:]); err != nil {
|
||||
return res, err
|
||||
}
|
||||
res.SetContainer(containerID)
|
||||
res.SetObject(objectID)
|
||||
return res, nil
|
||||
}
|
40
pkg/local_object_storage/blobstor/badgerstore/put.go
Normal file
40
pkg/local_object_storage/blobstor/badgerstore/put.go
Normal file
|
@ -0,0 +1,40 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var defaultStorageID = []byte("badger")
|
||||
|
||||
// Put implements common.Storage.
|
||||
func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.Bool("dont_compress", prm.DontCompress),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.readOnly() {
|
||||
return common.PutRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
if !prm.DontCompress {
|
||||
prm.RawData = s.cfg.compression.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
tx := s.db.NewTransaction(true)
|
||||
defer tx.Discard()
|
||||
|
||||
err := tx.Set(key(prm.Address), prm.RawData)
|
||||
if err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
return common.PutRes{StorageID: defaultStorageID}, tx.Commit()
|
||||
}
|
71
pkg/local_object_storage/blobstor/badgerstore/store.go
Normal file
71
pkg/local_object_storage/blobstor/badgerstore/store.go
Normal file
|
@ -0,0 +1,71 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
Type = "badgerstore"
|
||||
)
|
||||
|
||||
var _ common.Storage = (*Store)(nil)
|
||||
|
||||
type Store struct {
|
||||
cfg *cfg
|
||||
db *badger.DB
|
||||
|
||||
modeMtx *sync.Mutex // protects fields in group below
|
||||
opened bool
|
||||
gcCancel context.CancelFunc
|
||||
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// New returns new Store instance with opts applied.
|
||||
func New(opts ...Option) *Store {
|
||||
s := &Store{
|
||||
cfg: defaultCfg(),
|
||||
modeMtx: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(s.cfg)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Compressor implements common.Storage.
|
||||
func (s *Store) Compressor() *compression.Config {
|
||||
return s.cfg.compression
|
||||
}
|
||||
|
||||
// Path implements common.Storage.
|
||||
func (s *Store) Path() string {
|
||||
return s.cfg.db.Dir
|
||||
}
|
||||
|
||||
// SetCompressor implements common.Storage.
|
||||
func (s *Store) SetCompressor(cc *compression.Config) {
|
||||
s.cfg.compression = cc
|
||||
}
|
||||
|
||||
// SetParentID implements common.Storage.
|
||||
func (*Store) SetParentID(parentID string) {}
|
||||
|
||||
// SetReportErrorFunc implements common.Storage.
|
||||
func (*Store) SetReportErrorFunc(func(string, error)) {}
|
||||
|
||||
// Type implements common.Storage.
|
||||
func (*Store) Type() string {
|
||||
return Type
|
||||
}
|
||||
|
||||
// Rebuild implements common.Storage.
|
||||
func (*Store) Rebuild(context.Context, common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
}
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
|
@ -77,6 +78,14 @@ var storages = []storage{
|
|||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "badger",
|
||||
create: func(dir string) common.Storage {
|
||||
return badgerstore.New(
|
||||
badgerstore.WithPath(dir),
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func BenchmarkSubstorageReadPerf(b *testing.B) {
|
||||
|
|
Loading…
Reference in a new issue