From e9aed2245485a9927662113ee415a1aab6bfc228 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 25 Oct 2023 18:50:50 +0300 Subject: [PATCH] [#645] blobstor: Add Badger store Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 47 ++++++- .../shard/blobstor/badgerstore/config.go | 85 +++++++++++++ cmd/frostfs-node/validate.go | 3 +- .../blobstor/badgerstore/config.go | 117 ++++++++++++++++++ .../blobstor/badgerstore/control.go | 93 ++++++++++++++ .../blobstor/badgerstore/delete.go | 44 +++++++ .../blobstor/badgerstore/exists.go | 36 ++++++ .../blobstor/badgerstore/generic_test.go | 39 ++++++ .../blobstor/badgerstore/get.go | 107 ++++++++++++++++ .../blobstor/badgerstore/iterate.go | 113 +++++++++++++++++ .../blobstor/badgerstore/keys.go | 33 +++++ .../blobstor/badgerstore/put.go | 40 ++++++ .../blobstor/badgerstore/store.go | 71 +++++++++++ .../blobstor/perf_test.go | 9 ++ 14 files changed, 830 insertions(+), 7 deletions(-) create mode 100644 cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/config.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/control.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/delete.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/exists.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/generic_test.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/get.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/iterate.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/keys.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/put.go create mode 100644 pkg/local_object_storage/blobstor/badgerstore/store.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d78a90cfc..7a3b6aa10 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -21,6 +21,7 @@ import ( contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" + badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" @@ -34,6 +35,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" @@ -183,12 +185,20 @@ type subStorageCfg struct { noSync bool // blobovnicza-specific - size uint64 - width uint64 - leafWidth uint64 - openedCacheSize int - initWorkerCount int - initInAdvance bool + size uint64 + width uint64 + leafWidth uint64 + openedCacheSize int + initWorkerCount int + initInAdvance bool + + // badgerstore-specific + indexCacheSize int64 + memTablesCount int + compactorsCount int + gcInterval time.Duration + gcDiscardRatio float64 + valueLogFileSize int64 rebuildDropTimeout time.Duration } @@ -317,6 +327,14 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() + case badgerstore.Type: + sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i])) + sCfg.indexCacheSize = sub.IndexCacheSize() + sCfg.memTablesCount = sub.MemTablesCount() + sCfg.compactorsCount = sub.CompactorsCount() + sCfg.gcInterval = sub.GCInterval() + sCfg.gcDiscardRatio = sub.GCDiscardRatio() + sCfg.valueLogFileSize = sub.ValueLogFileSize() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -941,6 +959,23 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { return true }, }) + case badgerstore.Type: + badgerStoreOpts := []badgerstore.Option{ + badgerstore.WithPath(sRead.path), + badgerstore.WithPermissions(sRead.perm), + badgerstore.WithCompactorsCount(sRead.compactorsCount), + badgerstore.WithGCDiscardRatio(sRead.gcDiscardRatio), + badgerstore.WithGCInterval(sRead.gcInterval), + badgerstore.WithIndexCacheSize(sRead.indexCacheSize), + badgerstore.WithMemTablesCount(sRead.memTablesCount), + badgerstore.WithValueLogSize(sRead.valueLogFileSize), + } + ss = append(ss, blobstor.SubStorage{ + Storage: badgerstore.New(badgerStoreOpts...), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) default: // should never happen, that has already // been handled: when the config was read diff --git a/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go b/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go new file mode 100644 index 000000000..b80324d4e --- /dev/null +++ b/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go @@ -0,0 +1,85 @@ +package badgerstore + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" +) + +type Config config.Config + +const ( + IndexCacheSizeDefault = 256 << 20 // 256MB + MemTablesCountDefault = 32 + CompactorsCountDefault = 64 + GCIntervalDefault = 10 * time.Minute + GCDiscardRatioDefault = 0.2 + ValueLogSizeDefault = 1 << 30 // 1GB +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// Type returns the storage type. +func (x *Config) Type() string { + return badgerstore.Type +} + +// IndexCacheSize returns `index_cache_size` value or IndexCacheSizeDefault. +func (x *Config) IndexCacheSize() int64 { + s := config.SizeInBytesSafe((*config.Config)(x), "index_cache_size") + if s > 0 { + return int64(s) + } + + return IndexCacheSizeDefault +} + +// MemTablesCount returns `mem_tables_count` value or MemTablesCountDefault. +func (x *Config) MemTablesCount() int { + v := config.IntSafe((*config.Config)(x), "mem_tables_count") + if v > 0 { + return int(v) + } + return MemTablesCountDefault +} + +// CompactorsCount returns `compactors_count` value or CompactorsCountDefault. +func (x *Config) CompactorsCount() int { + v := config.IntSafe((*config.Config)(x), "compactors_count") + if v > 0 { + return int(v) + } + return CompactorsCountDefault +} + +// GCInterval returns `gc_interval` value or GCIntervalDefault. +func (x *Config) GCInterval() time.Duration { + v := config.DurationSafe((*config.Config)(x), "gc_interval") + if v > 0 { + return v + } + return GCIntervalDefault +} + +// GCDiscardRatio returns `gc_discard_percent` value as ratio value (in range (0.0; 1.0)) or GCDiscardRatioDefault. +func (x *Config) GCDiscardRatio() float64 { + v := config.Uint32Safe((*config.Config)(x), "gc_discard_percent") + if v > 0 && v < 100 { + return float64(v) / (float64(100)) + } + return GCDiscardRatioDefault +} + +// ValueLogFileSize returns `value_log_file_size` value or ValueLogSizeDefault. +func (x *Config) ValueLogFileSize() int64 { + s := config.SizeInBytesSafe((*config.Config)(x), "value_log_file_size") + if s > 0 { + return int64(s) + } + + return ValueLogSizeDefault +} diff --git a/cmd/frostfs-node/validate.go b/cmd/frostfs-node/validate.go index ae52b9e4a..dde895692 100644 --- a/cmd/frostfs-node/validate.go +++ b/cmd/frostfs-node/validate.go @@ -9,6 +9,7 @@ import ( shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" @@ -60,7 +61,7 @@ func validateConfig(c *config.Config) error { } for i := range blobstor { switch blobstor[i].Type() { - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, badgerstore.Type: default: return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum) } diff --git a/pkg/local_object_storage/blobstor/badgerstore/config.go b/pkg/local_object_storage/blobstor/badgerstore/config.go new file mode 100644 index 000000000..72a9f51bb --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/config.go @@ -0,0 +1,117 @@ +package badgerstore + +import ( + "io/fs" + "math" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" +) + +type cfg struct { + permissions fs.FileMode + compression *compression.Config + db badger.Options + gcTimeout time.Duration + gcDiscardRatio float64 +} + +type Option func(*cfg) + +// defaultCfg creates default options to create Store. +// Default Badger options: +// BaseTableSize: 2MB +// BaseLevelSize: 10MB +// TableSizeMultiplier: 2 +// LevelSizeMultiplier: 10 +// MaxLevels: 7 +// NumLevelZeroTables: 5 +// ValueLogFileSize: 1GB +// +// Badger flushes MemTable directly to Level0. +// So for Level0 MemTableSize is used as TableSize https://github.com/dgraph-io/badger/blob/v4.1.0/levels.go#L403. +// There is no total size limit for Level0, only NumLevelZeroTables +// +// Badger uses Dynamic Level Sizes like RocksDB. +// See https://github.com/facebook/rocksdb/blob/v3.11/include/rocksdb/options.h#L366 for explanation. +func defaultCfg() *cfg { + opts := badger.DefaultOptions("/") + opts.BlockCacheSize = 0 // compression and encryption are disabled, so block cache should be disabled + opts.IndexCacheSize = 256 << 20 // 256MB, to not to keep all indicies in memory + opts.Compression = options.None // performed by cfg.compressor + opts.Logger = nil + opts.MetricsEnabled = false + opts.NumLevelZeroTablesStall = math.MaxInt // to not to stall because of Level0 slow compaction + opts.NumMemtables = 32 // default memtable size is 64MB, so max memory consumption will be 2GB before stall + opts.NumCompactors = 64 + opts.SyncWrites = true + opts.ValueLogMaxEntries = math.MaxUint32 // default vLog file size is 1GB, so size is more clear than entries count + opts.ValueThreshold = 0 // to store all values in vLog + opts.LmaxCompaction = true + + return &cfg{ + permissions: 0o700, + db: opts, + gcTimeout: 10 * time.Minute, + gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free + } +} + +// WithPath sets BadgerStore directory. +func WithPath(dir string) Option { + return func(c *cfg) { + c.db.Dir = dir + c.db.ValueDir = dir + } +} + +// WithPermissions sets persmission flags. +func WithPermissions(p fs.FileMode) Option { + return func(c *cfg) { + c.permissions = p + } +} + +// WithIndexCacheSize sets BadgerStore index cache size. +func WithIndexCacheSize(sz int64) Option { + return func(c *cfg) { + c.db.IndexCacheSize = sz + } +} + +// WithMemTablesCount sets maximum count of memtables. +func WithMemTablesCount(count int) Option { + return func(c *cfg) { + c.db.NumMemtables = count + } +} + +// WithCompactorsCount sets count of concurrent compactors. +func WithCompactorsCount(count int) Option { + return func(c *cfg) { + c.db.NumCompactors = count + } +} + +// WithGCInterval sets GC interval value. +func WithGCInterval(d time.Duration) Option { + return func(c *cfg) { + c.gcTimeout = d + } +} + +// WithGCDiscardRatio sets GC discard ratio. +func WithGCDiscardRatio(r float64) Option { + return func(c *cfg) { + c.gcDiscardRatio = r + } +} + +// WithValueLogSize sets max value log size. +func WithValueLogSize(sz int64) Option { + return func(c *cfg) { + c.db.ValueLogFileSize = sz + } +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/control.go b/pkg/local_object_storage/blobstor/badgerstore/control.go new file mode 100644 index 000000000..2b808d97b --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/control.go @@ -0,0 +1,93 @@ +package badgerstore + +import ( + "context" + "errors" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "github.com/dgraph-io/badger/v4" +) + +var errStoreMustBeOpenedBeforeInit = errors.New("store must be opened before initialization") + +// Close implements common.Storage. +func (s *Store) Close() error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if !s.opened { + return nil + } + + if s.gcCancel != nil { + s.gcCancel() + } + s.wg.Wait() + + if err := s.db.Close(); err != nil { + return err + } + s.opened = false + return nil +} + +// Init implements common.Storage. +func (s *Store) Init() error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if !s.opened { + return errStoreMustBeOpenedBeforeInit + } + + s.startGC() + + return nil +} + +func (s *Store) startGC() { + ctx, cancel := context.WithCancel(context.Background()) + s.gcCancel = cancel + + t := time.NewTicker(s.cfg.gcTimeout) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + select { + case <-ctx.Done(): + return + case <-t.C: + if err := s.db.RunValueLogGC(s.cfg.gcDiscardRatio); err == nil { + _ = s.db.RunValueLogGC(s.cfg.gcDiscardRatio) // see https://dgraph.io/docs/badger/get-started/#garbage-collection + } + } + }() +} + +// Open implements common.Storage. +func (s *Store) Open(readOnly bool) error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if s.opened { + return nil + } + + err := util.MkdirAllX(s.cfg.db.Dir, s.cfg.permissions) + if err != nil { + return err + } + s.cfg.db.ReadOnly = readOnly + if s.db, err = badger.Open(s.cfg.db); err != nil { + return err + } + s.opened = true + return nil +} + +func (s *Store) readOnly() bool { + return s.cfg.db.ReadOnly +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/delete.go b/pkg/local_object_storage/blobstor/badgerstore/delete.go new file mode 100644 index 000000000..0bad88117 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/delete.go @@ -0,0 +1,44 @@ +package badgerstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Delete implements common.Storage. +func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + )) + defer span.End() + + if s.readOnly() { + return common.DeleteRes{}, common.ErrReadOnly + } + + tx := s.db.NewTransaction(true) + defer tx.Discard() + + _, err := tx.Get(key(prm.Address)) + if err != nil { + if err == badger.ErrKeyNotFound { + return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return common.DeleteRes{}, err + } + + err = tx.Delete(key(prm.Address)) + if err != nil { + return common.DeleteRes{}, err + } + return common.DeleteRes{}, tx.Commit() +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/exists.go b/pkg/local_object_storage/blobstor/badgerstore/exists.go new file mode 100644 index 000000000..5bf879f82 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/exists.go @@ -0,0 +1,36 @@ +package badgerstore + +import ( + "context" + "encoding/hex" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Exists implements common.Storage. +func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + )) + defer span.End() + + tx := s.db.NewTransaction(false) + defer tx.Discard() + + _, err := tx.Get(key(prm.Address)) + if err != nil { + if err == badger.ErrKeyNotFound { + return common.ExistsRes{Exists: false}, nil + } + return common.ExistsRes{}, err + } + + return common.ExistsRes{Exists: true}, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/generic_test.go b/pkg/local_object_storage/blobstor/badgerstore/generic_test.go new file mode 100644 index 000000000..d34981471 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/generic_test.go @@ -0,0 +1,39 @@ +package badgerstore + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" +) + +func TestGeneric(t *testing.T) { + const maxObjectSize = 1 << 16 + + helper := func(t *testing.T, dir string) common.Storage { + return New(WithPath(dir)) + } + + newStore := func(t *testing.T) common.Storage { + return helper(t, t.TempDir()) + } + + blobstortest.TestAll(t, newStore, 1024, maxObjectSize) + + t.Run("info", func(t *testing.T) { + dir := t.TempDir() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return helper(t, dir) + }, Type, dir) + }) +} + +func TestControl(t *testing.T) { + const maxObjectSize = 2048 + + newStore := func(t *testing.T) common.Storage { + return New(WithPath(t.TempDir())) + } + + blobstortest.TestControl(t, newStore, 1024, maxObjectSize) +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/get.go b/pkg/local_object_storage/blobstor/badgerstore/get.go new file mode 100644 index 000000000..ba028e3c2 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/get.go @@ -0,0 +1,107 @@ +package badgerstore + +import ( + "context" + "encoding/hex" + "fmt" + "strconv" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Get implements common.Storage. +func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.Bool("raw", prm.Raw), + )) + defer span.End() + + data, err := s.getObjectData(prm.Address) + if err != nil { + return common.GetRes{}, err + } + + data, err = s.cfg.compression.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err) + } + + return common.GetRes{Object: obj, RawData: data}, nil +} + +// GetRange implements common.Storage. +func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + + data, err := s.getObjectData(prm.Address) + if err != nil { + return common.GetRangeRes{}, err + } + + data, err = s.cfg.compression.Decompress(data) + if err != nil { + return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err) + } + + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + payload := obj.Payload() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) + } + + return common.GetRangeRes{ + Data: payload[from:to], + }, nil +} + +func (s *Store) getObjectData(addr oid.Address) ([]byte, error) { + var data []byte + tx := s.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key(addr)) + if err != nil { + if err == badger.ErrKeyNotFound { + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return nil, err + } + + data, err = item.ValueCopy(nil) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/iterate.go b/pkg/local_object_storage/blobstor/badgerstore/iterate.go new file mode 100644 index 000000000..8a1a1570d --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/iterate.go @@ -0,0 +1,113 @@ +package badgerstore + +import ( + "bytes" + "context" + "fmt" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Iterate implements common.Storage. +func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() + + var last []byte + opts := badger.DefaultIteratorOptions + batch := make([]keyValue, 0, opts.PrefetchSize) + opts.PrefetchSize++ // to skip last + for { + select { + case <-ctx.Done(): + return common.IterateRes{}, ctx.Err() + default: + } + + batch = batch[:0] + err := s.db.View(func(tx *badger.Txn) error { + it := tx.NewIterator(opts) + defer it.Close() + + for it.Seek(last); it.Valid(); it.Next() { + if bytes.Equal(last, it.Item().Key()) { + continue + } + + var kv keyValue + var err error + kv.key = it.Item().KeyCopy(nil) + kv.value, err = it.Item().ValueCopy(nil) + if err != nil { + if prm.IgnoreErrors { + continue + } + return err + } + batch = append(batch, kv) + last = kv.key + if len(batch) == opts.PrefetchSize-1 { + break + } + } + return nil + }) + if err != nil { + return common.IterateRes{}, err + } + + select { + case <-ctx.Done(): + return common.IterateRes{}, ctx.Err() + default: + } + + if len(batch) == 0 { + break + } + if err := s.iterateBatch(batch, prm); err != nil { + return common.IterateRes{}, err + } + } + + return common.IterateRes{}, nil +} + +func (s *Store) iterateBatch(batch []keyValue, prm common.IteratePrm) error { + for _, kv := range batch { + addr, err := address(kv.key) + if err != nil { + if prm.IgnoreErrors { + continue + } + } + data, err := s.cfg.compression.Decompress(kv.value) + if err != nil { + if prm.IgnoreErrors { + continue + } + return fmt.Errorf("could not decompress object data: %w", err) + } + + if err := prm.Handler(common.IterationElement{ + Address: addr, + ObjectData: data, + StorageID: defaultStorageID, + }); err != nil { + return err + } + } + return nil +} + +type keyValue struct { + key, value []byte +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/keys.go b/pkg/local_object_storage/blobstor/badgerstore/keys.go new file mode 100644 index 000000000..6a5869d43 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/keys.go @@ -0,0 +1,33 @@ +package badgerstore + +import ( + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +const ( + keyLength = 64 + objectIDOffset = 32 +) + +func key(add oid.Address) []byte { + res := make([]byte, keyLength) + add.Container().Encode(res) + add.Object().Encode(res[objectIDOffset:]) + return res +} + +func address(k []byte) (oid.Address, error) { + var res oid.Address + var containerID cid.ID + var objectID oid.ID + if err := containerID.Decode(k[:objectIDOffset]); err != nil { + return res, err + } + if err := objectID.Decode(k[objectIDOffset:]); err != nil { + return res, err + } + res.SetContainer(containerID) + res.SetObject(objectID) + return res, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/put.go b/pkg/local_object_storage/blobstor/badgerstore/put.go new file mode 100644 index 000000000..213f3a4f1 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/put.go @@ -0,0 +1,40 @@ +package badgerstore + +import ( + "context" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +var defaultStorageID = []byte("badger") + +// Put implements common.Storage. +func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.Bool("dont_compress", prm.DontCompress), + )) + defer span.End() + + if s.readOnly() { + return common.PutRes{}, common.ErrReadOnly + } + + if !prm.DontCompress { + prm.RawData = s.cfg.compression.Compress(prm.RawData) + } + + tx := s.db.NewTransaction(true) + defer tx.Discard() + + err := tx.Set(key(prm.Address), prm.RawData) + if err != nil { + return common.PutRes{}, err + } + return common.PutRes{StorageID: defaultStorageID}, tx.Commit() +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/store.go b/pkg/local_object_storage/blobstor/badgerstore/store.go new file mode 100644 index 000000000..7c1040bf5 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/store.go @@ -0,0 +1,71 @@ +package badgerstore + +import ( + "context" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "github.com/dgraph-io/badger/v4" +) + +const ( + Type = "badgerstore" +) + +var _ common.Storage = (*Store)(nil) + +type Store struct { + cfg *cfg + db *badger.DB + + modeMtx *sync.Mutex // protects fields in group below + opened bool + gcCancel context.CancelFunc + + wg *sync.WaitGroup +} + +// New returns new Store instance with opts applied. +func New(opts ...Option) *Store { + s := &Store{ + cfg: defaultCfg(), + modeMtx: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + } + for _, opt := range opts { + opt(s.cfg) + } + return s +} + +// Compressor implements common.Storage. +func (s *Store) Compressor() *compression.Config { + return s.cfg.compression +} + +// Path implements common.Storage. +func (s *Store) Path() string { + return s.cfg.db.Dir +} + +// SetCompressor implements common.Storage. +func (s *Store) SetCompressor(cc *compression.Config) { + s.cfg.compression = cc +} + +// SetParentID implements common.Storage. +func (*Store) SetParentID(parentID string) {} + +// SetReportErrorFunc implements common.Storage. +func (*Store) SetReportErrorFunc(func(string, error)) {} + +// Type implements common.Storage. +func (*Store) Type() string { + return Type +} + +// Rebuild implements common.Storage. +func (*Store) Rebuild(context.Context, common.RebuildPrm) (common.RebuildRes, error) { + return common.RebuildRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index 72079acba..bbee6f58a 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" @@ -77,6 +78,14 @@ var storages = []storage{ ) }, }, + { + desc: "badger", + create: func(dir string) common.Storage { + return badgerstore.New( + badgerstore.WithPath(dir), + ) + }, + }, } func BenchmarkSubstorageReadPerf(b *testing.B) {