WIP: Blobtree substorage #645

Closed
dstepanov-yadro wants to merge 11 commits from dstepanov-yadro/frostfs-node:feat/small_blob_store into master
46 changed files with 2856 additions and 86 deletions

View file

@ -21,7 +21,9 @@ import (
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
blobtreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobtree"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
@ -32,7 +34,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
@ -181,6 +185,14 @@ type subStorageCfg struct {
width uint64
leafWidth uint64
openedCacheSize int
// badgerstore-specific
indexCacheSize int64
memTablesCount int
compactorsCount int
gcInterval time.Duration
gcDiscardRatio float64
valueLogFileSize int64
}
// readConfig fills applicationConfiguration with raw configuration values
@ -302,6 +314,18 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
sCfg.noSync = sub.NoSync()
case blobtree.Type:
sub := blobtreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
sCfg.size = sub.Size()
case badgerstore.Type:
sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i]))
sCfg.indexCacheSize = sub.IndexCacheSize()
sCfg.memTablesCount = sub.MemTablesCount()
sCfg.compactorsCount = sub.CompactorsCount()
sCfg.gcInterval = sub.GCInterval()
sCfg.gcDiscardRatio = sub.GCDiscardRatio()
sCfg.valueLogFileSize = sub.ValueLogFileSize()
default:
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
}
@ -788,52 +812,37 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
for _, sRead := range shCfg.subStorages {
switch sRead.typ {
case blobovniczatree.Type:
blobTreeOpts := []blobovniczatree.Option{
blobovniczatree.WithRootPath(sRead.path),
blobovniczatree.WithPermissions(sRead.perm),
blobovniczatree.WithBlobovniczaSize(sRead.size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithLogger(c.log),
}
if c.metricsCollector != nil {
blobTreeOpts = append(blobTreeOpts,
blobovniczatree.WithMetrics(
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
),
)
}
blobovniczaTreeOpts := c.getBlobovniczaTreeOpts(sRead)
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...),
Storage: blobovniczatree.NewBlobovniczaTree(blobovniczaTreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),
fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth),
fstree.WithNoSync(sRead.noSync),
fstree.WithLogger(c.log),
}
if c.metricsCollector != nil {
fstreeOpts = append(fstreeOpts,
fstree.WithMetrics(
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
),
)
}
fstreeOpts := c.getFSTreeOpts(sRead)
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(fstreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return true
},
})
case blobtree.Type:
blobTreeOpts := c.getBlobTreeOpts(sRead)
ss = append(ss, blobstor.SubStorage{
Storage: blobtree.New(blobTreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case badgerstore.Type:
badgerStoreOpts := c.getBadgerStoreOpts(sRead)
ss = append(ss, blobstor.SubStorage{
Storage: badgerstore.New(badgerStoreOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
default:
// should never happen, that has already
// been handled: when the config was read
@ -842,6 +851,82 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
return ss
}
func (c *cfg) getBadgerStoreOpts(sRead subStorageCfg) []badgerstore.Option {
badgerStoreOpts := []badgerstore.Option{
badgerstore.WithPath(sRead.path),
badgerstore.WithPermissions(sRead.perm),
badgerstore.WithCompactorsCount(sRead.compactorsCount),
badgerstore.WithGCDiscardRatio(sRead.gcDiscardRatio),
badgerstore.WithGCInterval(sRead.gcInterval),
badgerstore.WithIndexCacheSize(sRead.indexCacheSize),
badgerstore.WithMemTablesCount(sRead.memTablesCount),
badgerstore.WithValueLogSize(sRead.valueLogFileSize),
}
if c.metricsCollector != nil {
badgerStoreOpts = append(badgerStoreOpts,
badgerstore.WithMetrics(
lsmetrics.NewBadgerStoreMetrics(sRead.path, c.metricsCollector.BadgerStoreMetrics())))
}
return badgerStoreOpts
}
func (c *cfg) getBlobTreeOpts(sRead subStorageCfg) []blobtree.Option {
blobTreeOpts := []blobtree.Option{
blobtree.WithPath(sRead.path),
blobtree.WithPerm(sRead.perm),
blobtree.WithDepth(sRead.depth),
blobtree.WithTargetSize(sRead.size),
}
if c.metricsCollector != nil {
blobTreeOpts = append(blobTreeOpts,
blobtree.WithMetrics(
lsmetrics.NewBlobTreeMetrics(sRead.path, c.metricsCollector.BlobTreeMetrics()),
),
)
}
return blobTreeOpts
}
func (c *cfg) getFSTreeOpts(sRead subStorageCfg) []fstree.Option {
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),
fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth),
fstree.WithNoSync(sRead.noSync),
fstree.WithLogger(c.log),
}
if c.metricsCollector != nil {
fstreeOpts = append(fstreeOpts,
fstree.WithMetrics(
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
),
)
}
return fstreeOpts
}
func (c *cfg) getBlobovniczaTreeOpts(sRead subStorageCfg) []blobovniczatree.Option {
blobTreeOpts := []blobovniczatree.Option{
blobovniczatree.WithRootPath(sRead.path),
blobovniczatree.WithPermissions(sRead.perm),
blobovniczatree.WithBlobovniczaSize(sRead.size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithLogger(c.log),
}
if c.metricsCollector != nil {
blobTreeOpts = append(blobTreeOpts,
blobovniczatree.WithMetrics(
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobovniczaTreeMetrics()),
),
)
}
return blobTreeOpts
}
func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
writeCacheOpts := c.getWriteCacheOpts(shCfg)
piloramaOpts := c.getPiloramaOpts(shCfg)

View file

@ -0,0 +1,85 @@
package badgerstore
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
)
type Config config.Config
const (
IndexCacheSizeDefault = 256 << 20 //256MB
MemTablesCountDefault = 32
CompactorsCountDefault = 64
GCIntervalDefault = 10 * time.Minute
GCDiscardRatioDefault = 0.2
ValueLogSizeDefault = 1 << 30 //1GB
)
// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
}
// Type returns the storage type.
func (x *Config) Type() string {
return badgerstore.Type
}
// IndexCacheSize returns `index_cache_size` value or IndexCacheSizeDefault.
func (x *Config) IndexCacheSize() int64 {
s := config.SizeInBytesSafe((*config.Config)(x), "index_cache_size")
if s > 0 {
return int64(s)
}
return IndexCacheSizeDefault
}
// MemTablesCount returns `mem_tables_count` value or MemTablesCountDefault.
func (x *Config) MemTablesCount() int {
v := config.IntSafe((*config.Config)(x), "mem_tables_count")
if v > 0 {
return int(v)
}
return MemTablesCountDefault
}
// CompactorsCount returns `mem_tables_count` value or CompactorsCountDefault.
func (x *Config) CompactorsCount() int {
v := config.IntSafe((*config.Config)(x), "compactors_count")
if v > 0 {
return int(v)
}
return CompactorsCountDefault
}
// GCInterval returns `gc_interval` value or GCIntervalDefault.
func (x *Config) GCInterval() time.Duration {
v := config.DurationSafe((*config.Config)(x), "gc_interval")
if v > 0 {
return v
}
return GCIntervalDefault
}
// GCDiscardRatio returns `gc_discard_percent` value as ratio value (in range (0.0; 1.0)) or GCDiscardRatioDefault.
func (x *Config) GCDiscardRatio() float64 {
v := config.Uint32Safe((*config.Config)(x), "gc_discard_percent")
if v > 0 && v < 100 {
return float64(v) / (float64(100))
}
return GCDiscardRatioDefault
}
// ValueLogFileSize returns `value_log_file_size` value or ValueLogSizeDefault.
func (x *Config) ValueLogFileSize() int64 {
s := config.SizeInBytesSafe((*config.Config)(x), "value_log_file_size")
if s > 0 {
return int64(s)
}
return ValueLogSizeDefault
}

View file

@ -0,0 +1,60 @@
package blobtreeconfig
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
)
// Config is a wrapper over the config section
// which provides access to Blobtree configurations.
type Config config.Config
const (
// SizeDefault is a default limit of estimates of single Blobtree file size.
SizeDefault = 4 * 1024 * 1024
// DepthDefault is a default shallow dir depth.
DepthDefault = 8
)
// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
}
// Type returns the storage type.
func (x *Config) Type() string {
return blobtree.Type
}
// Size returns the value of "size" config parameter.
//
// Returns SizeDefault if the value is not a positive number.
func (x *Config) Size() uint64 {
s := config.SizeInBytesSafe(
(*config.Config)(x),
"size",
)
if s > 0 {
return s
}
return SizeDefault
}
// ShallowDepth returns the value of "depth" config parameter.
//
// Returns ShallowDepthDefault if the value is not a positive number.
func (x *Config) Depth() uint64 {
d := config.UintSafe(
(*config.Config)(x),
"depth",
)
if d > 0 {
return d
}
return DepthDefault
}

View file

@ -9,7 +9,9 @@ import (
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
)
@ -55,7 +57,7 @@ func validateConfig(c *config.Config) error {
}
for i := range blobstor {
switch blobstor[i].Type() {
case fstree.Type, blobovniczatree.Type:
case fstree.Type, blobovniczatree.Type, blobtree.Type, badgerstore.Type:
default:
return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum)
}

View file

@ -515,5 +515,6 @@ const (
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
BadgerStoreGCFailed = "failed to run GC on badgerstore"
FailedToGetContainerCounters = "failed to get container counters values"
)

View file

@ -97,7 +97,7 @@ func TestBlobovnicza(t *testing.T) {
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
}
// blobovnizca accepts object event if full
// blobovnicza accepts object event if full
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
return err == nil
}, nil)

View file

@ -0,0 +1,136 @@
package badgerstore
import (
"io/fs"
"math"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"
"go.uber.org/zap"
)
type cfg struct {
permissions fs.FileMode
compression *compression.Config
db badger.Options
gcTimeout time.Duration
gcDiscardRatio float64
metrics Metrics
logger *logger.Logger
}
type Option func(*cfg)
// defaultCfg creates default options to create Store.
// Default Badger options:
// BaseTableSize: 2MB
// BaseLevelSize: 10MB
// TableSizeMultiplier: 2
// LevelSizeMultiplier: 10
// MaxLevels: 7
// NumLevelZeroTables: 5
// ValueLogFileSize: 1GB
//
// Badger flushes MemTable directly to Level0.
// So for Level0 MemTableSize is used as TableSize https://github.com/dgraph-io/badger/blob/v4.1.0/levels.go#L403.
// There is no total size limit for Level0, only NumLevelZeroTables
//
// Badger uses Dynamic Level Sizes like RocksDB.
// See https://github.com/facebook/rocksdb/blob/v3.11/include/rocksdb/options.h#L366 for explanation.
func defaultCfg() *cfg {
opts := badger.DefaultOptions("/")
opts.BlockCacheSize = 0 // compression and encryption are disabled, so block cache should be disabled
opts.IndexCacheSize = 256 << 20 // 256MB, to not to keep all indicies in memory
opts.Compression = options.None // performed by cfg.compressor
opts.Logger = nil
opts.MetricsEnabled = false
opts.NumLevelZeroTablesStall = math.MaxInt // to not to stall because of Level0 slow compaction
opts.NumMemtables = 32 // default memtable size is 64MB, so max memory consumption will be 2GB before stall
opts.NumCompactors = 64
opts.SyncWrites = true
opts.ValueLogMaxEntries = math.MaxUint32 // default vLog file size is 1GB, so size is more clear than entries count
opts.ValueThreshold = 0 // to store all values in vLog
return &cfg{
permissions: 0o700,
db: opts,
gcTimeout: 10 * time.Minute,
gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free
metrics: &noopMetrics{},
logger: &logger.Logger{Logger: zap.L()},
}
}
// WithPath sets BadgerStore directory.
func WithPath(dir string) Option {
return func(c *cfg) {
c.db.Dir = dir
c.db.ValueDir = dir
}
}
// WithPermissions sets persmission flags.
func WithPermissions(p fs.FileMode) Option {
return func(c *cfg) {
c.permissions = p
}
}
// WithIndexCacheSize sets BadgerStore index cache size.
func WithIndexCacheSize(sz int64) Option {
return func(c *cfg) {
c.db.IndexCacheSize = sz
}
}
// WithMemTablesCount sets maximum count of memtables.
func WithMemTablesCount(count int) Option {
return func(c *cfg) {
c.db.NumMemtables = count
}
}
// WithCompactorsCount sets count of concurrent compactors.
func WithCompactorsCount(count int) Option {
return func(c *cfg) {
c.db.NumCompactors = count
}
}
// WithGCInterval sets GC interval value.
func WithGCInterval(d time.Duration) Option {
return func(c *cfg) {
c.gcTimeout = d
}
}
// WithGCDiscardRatio sets GC discard ratio.
func WithGCDiscardRatio(r float64) Option {
return func(c *cfg) {
c.gcDiscardRatio = r
}
}
// WithValueLogSize sets max value log size.
func WithValueLogSize(sz int64) Option {
return func(c *cfg) {
c.db.ValueLogFileSize = sz
}
}
// WithMetrics sets metrics.
func WithMetrics(m Metrics) Option {
return func(c *cfg) {
c.metrics = m
}
}
// WithLogger sets logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.logger = l
}
}

View file

@ -0,0 +1,97 @@
package badgerstore
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"github.com/dgraph-io/badger/v4"
"go.uber.org/zap"
)
// Close implements common.Storage.
func (s *Store) Close() error {
s.modeMtx.Lock()
defer s.modeMtx.Unlock()
if !s.opened {
return nil
}
if s.gcCancel != nil {
s.gcCancel()
}
s.wg.Wait()
if err := s.db.Close(); err != nil {
return err
}
s.opened = false
s.cfg.metrics.Close()
return nil
}
// Init implements common.Storage.
func (s *Store) Init() error {
s.modeMtx.Lock()
defer s.modeMtx.Unlock()
if !s.opened {
return fmt.Errorf("store must be opened before initialization")
}
s.startGC()
return nil
}
func (s *Store) startGC() {
ctx, cancel := context.WithCancel(context.Background())
s.gcCancel = cancel
t := time.NewTicker(s.cfg.gcTimeout)
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case <-ctx.Done():
return
case <-t.C:
if err := s.db.RunValueLogGC(s.cfg.gcDiscardRatio); err == nil {
_ = s.db.RunValueLogGC(s.cfg.gcDiscardRatio) // see https://dgraph.io/docs/badger/get-started/#garbage-collection
} else {
s.cfg.logger.Error(logs.BadgerStoreGCFailed, zap.Error(err), zap.String("path", s.cfg.db.Dir))
}
}
}()
}
// Open implements common.Storage.
func (s *Store) Open(readOnly bool) error {
s.modeMtx.Lock()
defer s.modeMtx.Unlock()
if s.opened {
return nil
}
err := util.MkdirAllX(s.cfg.db.Dir, s.cfg.permissions)
if err != nil {
return err
}
s.cfg.db.ReadOnly = readOnly
if s.db, err = badger.Open(s.cfg.db); err != nil {
return err
}
s.opened = true
s.cfg.metrics.SetMode(readOnly)
return nil
}
func (s *Store) readOnly() bool {
return s.cfg.db.ReadOnly
}

View file

@ -0,0 +1,55 @@
package badgerstore
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Delete implements common.Storage.
func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
success := false
startedAt := time.Now()
defer func() {
s.cfg.metrics.Delete(time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.String("address", prm.Address.EncodeToString()),
))
defer span.End()
if s.readOnly() {
return common.DeleteRes{}, common.ErrReadOnly
}
tx := s.db.NewTransaction(true)
defer tx.Discard()
_, err := tx.Get(key(prm.Address))
if err != nil {
if err == badger.ErrKeyNotFound {
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.DeleteRes{}, err
}
if err = tx.Delete(key(prm.Address)); err != nil {
return common.DeleteRes{}, err
}
if err = tx.Commit(); err != nil {
return common.DeleteRes{}, err
}
success = true
return common.DeleteRes{}, nil
}

View file

@ -0,0 +1,46 @@
package badgerstore
import (
"context"
"encoding/hex"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Exists implements common.Storage.
func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
success := false
startedAt := time.Now()
defer func() {
s.cfg.metrics.Exists(time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
))
defer span.End()
tx := s.db.NewTransaction(false)
defer tx.Discard()
_, err := tx.Get(key(prm.Address))
if err != nil {
if err == badger.ErrKeyNotFound {
success = true
return common.ExistsRes{Exists: false}, nil
}
return common.ExistsRes{}, err
}
success = true
return common.ExistsRes{Exists: true}, nil
}

View file

@ -0,0 +1,39 @@
package badgerstore
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
)
func TestGeneric(t *testing.T) {
const maxObjectSize = 1 << 16
helper := func(t *testing.T, dir string) common.Storage {
return New(WithPath(dir))
}
newStore := func(t *testing.T) common.Storage {
return helper(t, t.TempDir())
}
blobstortest.TestAll(t, newStore, 1024, maxObjectSize)
t.Run("info", func(t *testing.T) {
dir := t.TempDir()
blobstortest.TestInfo(t, func(t *testing.T) common.Storage {
return helper(t, dir)
}, Type, dir)
})
}
func TestControl(t *testing.T) {
const maxObjectSize = 2048
newStore := func(t *testing.T) common.Storage {
return New(WithPath(t.TempDir()))
}
blobstortest.TestControl(t, newStore, 1024, maxObjectSize)
}

View file

@ -0,0 +1,127 @@
package badgerstore
import (
"context"
"encoding/hex"
"fmt"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Get implements common.Storage.
func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
success := false
size := 0
startedAt := time.Now()
defer func() {
s.cfg.metrics.Get(time.Since(startedAt), size, success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.Bool("raw", prm.Raw),
))
defer span.End()
data, err := s.getObjectData(prm.Address)
if err != nil {
return common.GetRes{}, err
}
data, err = s.cfg.compression.Decompress(data)
if err != nil {
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
success = true
size = len(data)
return common.GetRes{Object: obj, RawData: data}, nil
}
// GetRange implements common.Storage.
func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
success := false
size := 0
startedAt := time.Now()
defer func() {
s.cfg.metrics.GetRange(time.Since(startedAt), size, success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
data, err := s.getObjectData(prm.Address)
if err != nil {
return common.GetRangeRes{}, err
}
data, err = s.cfg.compression.Decompress(data)
if err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
}
from := prm.Range.GetOffset()
to := from + prm.Range.GetLength()
payload := obj.Payload()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
res := common.GetRangeRes{Data: payload[from:to]}
success = true
size = len(res.Data)
return res, nil
}
func (s *Store) getObjectData(addr oid.Address) ([]byte, error) {
var data []byte
tx := s.db.NewTransaction(false)
defer tx.Discard()
item, err := tx.Get(key(addr))
if err != nil {
if err == badger.ErrKeyNotFound {
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return nil, err
}
data, err = item.ValueCopy(nil)
if err != nil {
return nil, err
}
return data, nil
}

View file

@ -0,0 +1,121 @@
package badgerstore
import (
"bytes"
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"github.com/dgraph-io/badger/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Iterate implements common.Storage.
func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
success := false
startedAt := time.Now()
defer func() {
s.cfg.metrics.Iterate(time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
var last []byte
opts := badger.DefaultIteratorOptions
batch := make([]keyValue, 0, opts.PrefetchSize)
opts.PrefetchSize++ // to skip last
for {
select {
case <-ctx.Done():
return common.IterateRes{}, ctx.Err()
default:
}
batch = batch[:0]
err := s.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(opts)
defer it.Close()
for it.Seek(last); it.Valid(); it.Next() {
if bytes.Equal(last, it.Item().Key()) {
continue
}
var kv keyValue
var err error
kv.key = it.Item().KeyCopy(nil)
kv.value, err = it.Item().ValueCopy(nil)
if err != nil {
if prm.IgnoreErrors {
continue
}
return err
}
batch = append(batch, kv)
last = kv.key
if len(batch) == opts.PrefetchSize-1 {
break
}
}
return nil
})
if err != nil {
return common.IterateRes{}, err
}
select {
case <-ctx.Done():
return common.IterateRes{}, ctx.Err()
default:
}
if len(batch) == 0 {
break
}
if err := s.iterateBatch(batch, prm); err != nil {
return common.IterateRes{}, err
}
}
success = true
return common.IterateRes{}, nil
}
func (s *Store) iterateBatch(batch []keyValue, prm common.IteratePrm) error {
for _, kv := range batch {
addr, err := address(kv.key)
if err != nil {
if prm.IgnoreErrors {
continue
}
}
data, err := s.cfg.compression.Decompress(kv.value)
if err != nil {
if prm.IgnoreErrors {
continue
}
return fmt.Errorf("could not decompress object data: %w", err)
}
if err := prm.Handler(common.IterationElement{
Address: addr,
ObjectData: data,
}); err != nil {
return err
}
}
return nil
}
type keyValue struct {
key, value []byte
}

View file

@ -0,0 +1,28 @@
package badgerstore
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func key(add oid.Address) []byte {
res := make([]byte, 64)
add.Container().Encode(res)
add.Object().Encode(res[32:])
return res
}
func address(k []byte) (oid.Address, error) {
var res oid.Address
var containerID cid.ID
var objectID oid.ID
if err := containerID.Decode(k[:32]); err != nil {
return res, err
}
if err := objectID.Decode(k[32:]); err != nil {
return res, err
}
res.SetContainer(containerID)
res.SetObject(objectID)
return res, nil
}

View file

@ -0,0 +1,31 @@
package badgerstore
import "time"
type Metrics interface {
SetParentID(parentID string)
SetMode(readOnly bool)
Close()
Delete(d time.Duration, success bool)
Exists(d time.Duration, success bool)
GetRange(d time.Duration, size int, success bool)
Get(d time.Duration, size int, success bool)
Iterate(d time.Duration, success bool)
Put(d time.Duration, size int, success bool)
}
var _ Metrics = (*noopMetrics)(nil)
type noopMetrics struct{}
func (*noopMetrics) Close() {}
func (*noopMetrics) Delete(time.Duration, bool) {}
func (*noopMetrics) Exists(time.Duration, bool) {}
func (*noopMetrics) Get(time.Duration, int, bool) {}
func (*noopMetrics) GetRange(time.Duration, int, bool) {}
func (*noopMetrics) Iterate(time.Duration, bool) {}
func (*noopMetrics) Put(time.Duration, int, bool) {}
func (*noopMetrics) SetMode(bool) {}
func (*noopMetrics) SetParentID(string) {}

View file

@ -0,0 +1,52 @@
package badgerstore
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// Put implements common.Storage.
func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
success := false
size := 0
startedAt := time.Now()
defer func() {
s.cfg.metrics.Put(time.Since(startedAt), size, success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put",
trace.WithAttributes(
attribute.String("path", s.cfg.db.Dir),
attribute.String("address", prm.Address.EncodeToString()),
attribute.Bool("dont_compress", prm.DontCompress),
))
defer span.End()
if s.readOnly() {
return common.PutRes{}, common.ErrReadOnly
}
if !prm.DontCompress {
prm.RawData = s.cfg.compression.Compress(prm.RawData)
}
b := s.db.NewWriteBatch()
defer b.Cancel()
err := b.Set(key(prm.Address), prm.RawData)
if err != nil {
return common.PutRes{}, err
}
if err = b.Flush(); err != nil {
return common.PutRes{}, err
}
success = true
size = len(prm.RawData)
return common.PutRes{}, nil
}

View file

@ -0,0 +1,68 @@
package badgerstore
import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"github.com/dgraph-io/badger/v4"
)
const (
Type = "badgerstore"
)
var _ common.Storage = (*Store)(nil)
type Store struct {
cfg *cfg
db *badger.DB
modeMtx *sync.Mutex // protects fields in group below
opened bool
gcCancel context.CancelFunc
wg *sync.WaitGroup
}
// New returns new Store instance with opts applied.
func New(opts ...Option) *Store {
s := &Store{
cfg: defaultCfg(),
modeMtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
}
for _, opt := range opts {
opt(s.cfg)
}
return s
}
// Compressor implements common.Storage.
func (s *Store) Compressor() *compression.Config {
return s.cfg.compression
}
// Path implements common.Storage.
func (s *Store) Path() string {
return s.cfg.db.Dir
}
// SetCompressor implements common.Storage.
func (s *Store) SetCompressor(cc *compression.Config) {
s.cfg.compression = cc
}
// SetParentID implements common.Storage.
func (s *Store) SetParentID(parentID string) {
s.cfg.metrics.SetParentID(parentID)
}
// SetReportErrorFunc implements common.Storage.
func (*Store) SetReportErrorFunc(func(string, error)) {}
// Type implements common.Storage.
func (*Store) Type() string {
return Type
}

View file

@ -50,7 +50,7 @@ func (b *Blobovniczas) Close() error {
b.dbCache.Close() // order important
b.activeDBManager.Close()
b.commondbManager.Close()
b.metrics.Close()
return nil
}

View file

@ -15,7 +15,7 @@ import (
"go.uber.org/zap"
)
// Put saves object in the maximum weight blobobnicza.
// Put saves object in the maximum weight blobovnicza.
//
// returns error if could not save object in any blobovnicza.
func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {

View file

@ -0,0 +1,83 @@
package blobtree
import (
"errors"
"path/filepath"
"strings"
"sync/atomic"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
var _ common.Storage = &BlobTree{}
type BlobTree struct {
cfg cfg
dirLock *utilSync.KeyLocker[string]
fileLock *utilSync.KeyLocker[string]
compressor *compression.Config
dispatcher *rootDispatcher
suffix atomic.Uint64
}
func New(opts ...Option) *BlobTree {
b := &BlobTree{
cfg: cfg{
targetFileSizeBytes: 4 * 1024 * 1024,
rootPath: "./",
depth: 3,
permissions: 0700,
initWorkersCount: 1000,
metrics: &noopMetrics{},
},
dirLock: utilSync.NewKeyLocker[string](),
fileLock: utilSync.NewKeyLocker[string](),
}
for _, opt := range opts {
opt(&b.cfg)
}
b.dispatcher = newRootDispatcher()
return b
}
func (b *BlobTree) getDir(addr oid.Address) string {
sAddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
var sb strings.Builder
size := int(b.cfg.depth * (directoryLength + 1)) // (character + slash for every level)
sb.Grow(size)
for i := uint64(0); i < b.cfg.depth; i++ {
if i > 0 {
sb.WriteRune(filepath.Separator)
}
sb.WriteString(sAddr[:directoryLength])
sAddr = sAddr[directoryLength:]
}
return sb.String()
}
func (b *BlobTree) createDir(dir string, isSystemPath bool) error {
b.dirLock.Lock(dir)
defer b.dirLock.Unlock(dir)
if !isSystemPath {
dir = b.getSystemPath(dir)
}
if err := util.MkdirAllX(dir, b.cfg.permissions); err != nil {
if errors.Is(err, syscall.ENOSPC) {
err = common.ErrNoSpace
return err
}
return err
}
return nil
}

View file

@ -0,0 +1,15 @@
package blobtree
import "io/fs"
var directoryLength uint64 = 1
type cfg struct {
rootPath string
depth uint64
targetFileSizeBytes uint64
permissions fs.FileMode
readOnly bool
initWorkersCount int
metrics Metrics
}

View file

@ -0,0 +1,209 @@
package blobtree
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
const (
defaultVersion = 0
sizeOfVersion = 1
sizeOfCount = 8
sizeOfDataLength = 8
sizeOfContainerID = sha256.Size
sizeOfObjectID = sha256.Size
dataExtension = ".data"
)
var (
errFileToSmall = errors.New("invalid file content: not enough bytes to read count of records")
errInvalidFileContentVersion = errors.New("invalid file content: not enough bytes to read record version")
errInvalidFileContentContainerID = errors.New("invalid file content: not enough bytes to read container ID")
errInvalidFileContentObjectID = errors.New("invalid file content: not enough bytes to read object ID")
errInvalidFileContentLength = errors.New("invalid file content: not enough bytes to read data length")
errInvalidFileContentData = errors.New("invalid file content: not enough bytes to read data")
)
type objectData struct {
Version byte
Address oid.Address
Data []byte
}
func (b *BlobTree) readFileContent(path string) ([]objectData, error) {
rawData, err := os.ReadFile(b.getSystemPath(path))
if err != nil {
if os.IsNotExist(err) {
return []objectData{}, nil
}
return nil, err
}
return b.unmarshalSlice(rawData)
}
func (b *BlobTree) unmarshalSlice(data []byte) ([]objectData, error) {
if len(data) < sizeOfCount {
return nil, errFileToSmall
}
count := binary.LittleEndian.Uint64(data[:8])
result := make([]objectData, 0, count)
data = data[sizeOfCount:]
var idx uint64
for idx = 0; idx < count; idx++ {
record, read, err := b.unmarshalRecord(data)
if err != nil {
return nil, err
}
result = append(result, record)
data = data[read:]
}
return result, nil
}
func (b *BlobTree) unmarshalRecord(data []byte) (objectData, uint64, error) {
if len(data) < sizeOfVersion {
return objectData{}, 0, errInvalidFileContentVersion
}
var result objectData
var read uint64
result.Version = data[0]
if result.Version != defaultVersion {
return objectData{}, 0, fmt.Errorf("invalid file content: unknown version %d", result.Version)
}
read += sizeOfVersion
if len(data[read:]) < sizeOfContainerID {
return objectData{}, 0, errInvalidFileContentContainerID
}
var contID cid.ID
if err := contID.Decode(data[read : read+sizeOfContainerID]); err != nil {
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read container ID: %w", err)
}
read += sizeOfContainerID
if len(data[read:]) < sizeOfObjectID {
return objectData{}, 0, errInvalidFileContentObjectID
}
var objID oid.ID
if err := objID.Decode(data[read : read+sizeOfObjectID]); err != nil {
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read object ID: %w", err)
}
read += sizeOfObjectID
result.Address.SetContainer(contID)
result.Address.SetObject(objID)
if len(data[read:]) < sizeOfDataLength {
return objectData{}, 0, errInvalidFileContentLength
}
dataLength := binary.LittleEndian.Uint64(data[read : read+sizeOfDataLength])
read += sizeOfDataLength
if uint64(len(data[read:])) < dataLength {
return objectData{}, 0, errInvalidFileContentData
}
result.Data = make([]byte, dataLength)
copy(result.Data, data[read:read+dataLength])
read += dataLength
return result, read, nil
}
func (b *BlobTree) saveContentToFile(records []objectData, path string) (uint64, error) {
data, err := b.marshalSlice(records)
if err != nil {
return 0, err
}
return uint64(len(data)), b.writeFile(path, data)
}
func (b *BlobTree) writeFile(p string, data []byte) error {
p = b.getSystemPath(p)
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL|os.O_SYNC, b.cfg.permissions)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}
func (b *BlobTree) getSystemPath(path string) string {
return filepath.Join(b.cfg.rootPath, path)
}
func (b *BlobTree) marshalSlice(records []objectData) ([]byte, error) {
buf := make([]byte, b.estimateSize(records))
result := buf
binary.LittleEndian.PutUint64(buf, uint64(len(records)))
buf = buf[sizeOfCount:]
for _, record := range records {
written := b.marshalRecord(record, buf)
buf = buf[written:]
}
return result, nil
}
func (b *BlobTree) marshalRecord(record objectData, dst []byte) uint64 {
var written uint64
dst[0] = record.Version
dst = dst[sizeOfVersion:]
written += sizeOfVersion
record.Address.Container().Encode(dst)
dst = dst[sizeOfContainerID:]
written += sizeOfContainerID
record.Address.Object().Encode(dst)
dst = dst[sizeOfObjectID:]
written += sizeOfObjectID
binary.LittleEndian.PutUint64(dst, uint64(len(record.Data)))
dst = dst[sizeOfDataLength:]
written += sizeOfDataLength
copy(dst, record.Data)
written += uint64(len(record.Data))
return written
}
func (b *BlobTree) estimateSize(records []objectData) uint64 {
var result uint64
result += sizeOfCount
for _, record := range records {
result += (sizeOfVersion + sizeOfContainerID + sizeOfObjectID + sizeOfDataLength)
result += uint64(len(record.Data))
}
return result
}
func (b *BlobTree) getFilePath(dir string, idx uint64) string {
return filepath.Join(dir, strconv.FormatUint(idx, 16)+dataExtension)
}
func (b *BlobTree) parsePath(path string) (string, uint64, error) {
dir := filepath.Dir(path)
fileName := strings.TrimSuffix(filepath.Base(path), dataExtension)
idx, err := strconv.ParseUint(fileName, 16, 64)
if err != nil {
return "", 0, fmt.Errorf("failed to parse blobtree path: %w", err)
}
return dir, idx, nil
}

View file

@ -0,0 +1,102 @@
package blobtree
import (
"os"
"path/filepath"
"strconv"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"golang.org/x/sync/errgroup"
)
var Type = "blobtree"
func (b *BlobTree) Open(readOnly bool) error {
b.cfg.readOnly = readOnly
b.cfg.metrics.SetMode(readOnly)
return nil
}
func (b *BlobTree) Init() error {
if err := b.createDir(b.cfg.rootPath, true); err != nil {
return err
}
var eg errgroup.Group
eg.SetLimit(b.cfg.initWorkersCount)
eg.Go(func() error {
return b.initDir(&eg, "")
})
return eg.Wait()
}
func (b *BlobTree) initDir(eg *errgroup.Group, dir string) error {
entities, err := os.ReadDir(b.getSystemPath(dir))
if err != nil {
return err
}
for _, entity := range entities {
if entity.IsDir() {
eg.Go(func() error {
return b.initDir(eg, filepath.Join(dir, entity.Name()))
})
continue
}
path := filepath.Join(dir, entity.Name())
if b.isTempFile(entity.Name()) {
if err = os.Remove(b.getSystemPath(path)); err != nil {
return err
}
continue
}
idx, err := b.parseIdx(entity.Name())
if err != nil {
return err
}
b.dispatcher.Init(dir, idx)
b.cfg.metrics.IncFilesCount()
stat, err := os.Stat(b.getSystemPath(path))
if err != nil {
return err
}
if stat.Size() < int64(b.cfg.targetFileSizeBytes) {
b.dispatcher.ReturnIdx(dir, idx)
}
}
return nil
}
func (b *BlobTree) isTempFile(name string) bool {
return strings.Contains(name, tempFileSymbols)
}
func (b *BlobTree) parseIdx(name string) (uint64, error) {
return strconv.ParseUint(strings.TrimSuffix(name, dataExtension), 16, 64)
}
func (b *BlobTree) Close() error {
b.cfg.metrics.Close()
return nil
}
func (b *BlobTree) Type() string { return Type }
func (b *BlobTree) Path() string { return b.cfg.rootPath }
func (b *BlobTree) SetCompressor(cc *compression.Config) {
b.compressor = cc
}
func (b *BlobTree) Compressor() *compression.Config {
return b.compressor
}
func (b *BlobTree) SetReportErrorFunc(_ func(string, error)) {}
func (b *BlobTree) SetParentID(parentID string) {
b.cfg.metrics.SetParentID(parentID)
}

View file

@ -0,0 +1,151 @@
package blobtree
import (
"context"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
var (
success = false
startedAt = time.Now()
)
defer func() {
b.cfg.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Delete",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.StorageID)),
))
defer span.End()
if b.cfg.readOnly {
return common.DeleteRes{}, common.ErrReadOnly
}
var res common.DeleteRes
var err error
if path, ok := getPathFromStorageID(prm.StorageID); ok {
res, err = b.deleteFromPath(prm.Address, path)
} else {
res, err = b.findAndDelete(prm.Address)
}
success = err == nil
return res, err
}
func (b *BlobTree) deleteFromPath(addr oid.Address, path string) (common.DeleteRes, error) {
b.fileLock.Lock(path)
defer b.fileLock.Unlock(path)
dir, idx, err := b.parsePath(path)
if err != nil {
return common.DeleteRes{}, err
}
records, err := b.readFileContent(path)
if err != nil {
return common.DeleteRes{}, err
}
deleteIdx := -1
for i := range records {
if records[i].Address.Equals(addr) {
deleteIdx = i
break
}
}
if deleteIdx == -1 {
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
if len(records) == 1 {
err = os.Remove(b.getSystemPath(path))
if err == nil {
b.dispatcher.ReturnIdx(dir, idx)
b.cfg.metrics.DecFilesCount()
}
return common.DeleteRes{}, err
}
records = append(records[:deleteIdx], records[deleteIdx+1:]...)
size, err := b.writeToTmpAndRename(records, path)
if err != nil {
return common.DeleteRes{}, err
}
if size < b.cfg.targetFileSizeBytes {
b.dispatcher.ReturnIdx(dir, idx)
}
return common.DeleteRes{}, nil
}
func (b *BlobTree) findAndDelete(addr oid.Address) (common.DeleteRes, error) {
dir := b.getDir(addr)
idx, err := b.findFileIdx(dir, addr)
if err != nil {
return common.DeleteRes{}, err
}
return b.deleteFromPath(addr, b.getFilePath(dir, idx))
}
func (b *BlobTree) findFileIdx(dir string, addr oid.Address) (uint64, error) {
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
if err != nil {
if os.IsNotExist(err) {
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return 0, err
}
for _, entity := range entities {
if entity.IsDir() {
continue
}
if b.isTempFile(entity.Name()) {
continue
}
idx, err := b.parseIdx(entity.Name())
if err != nil {
continue
}
path := b.getFilePath(dir, idx)
contains, err := b.fileContainsObject(path, addr)
if err != nil {
return 0, err
}
if contains {
return idx, nil
}
}
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func (b *BlobTree) fileContainsObject(path string, addr oid.Address) (bool, error) {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
return false, err
}
for i := range records {
if records[i].Address.Equals(addr) {
return true, nil
}
}
return false, nil
}

View file

@ -0,0 +1,94 @@
package blobtree
import (
"sync"
)
type rootDispatcher struct {
dispatchers map[string]*dirDispatcher
guard sync.Mutex
}
func newRootDispatcher() *rootDispatcher {
return &rootDispatcher{
dispatchers: make(map[string]*dirDispatcher),
}
}
func (d *rootDispatcher) GetIdxForWrite(dir string) uint64 {
return d.getDirDispatcher(dir).GetIdxForWrite()
}
func (d *rootDispatcher) ReturnIdx(dir string, idx uint64) {
d.getDirDispatcher(dir).ReturnIdx(idx)
}
func (d *rootDispatcher) Init(dir string, idx uint64) {
d.getDirDispatcher(dir).Init(idx)
}
func (d *rootDispatcher) getDirDispatcher(dir string) *dirDispatcher {
d.guard.Lock()
defer d.guard.Unlock()
if result, ok := d.dispatchers[dir]; ok {
return result
}
result := newDirDispatcher(dir)
d.dispatchers[dir] = result
return result
}
type dirDispatcher struct {
dir string
guard sync.Mutex
indicies map[uint64]struct{}
nextIndex uint64
}
func newDirDispatcher(dir string) *dirDispatcher {
return &dirDispatcher{
dir: dir,
indicies: make(map[uint64]struct{}),
}
}
func (d *dirDispatcher) GetIdxForWrite() uint64 {
d.guard.Lock()
defer d.guard.Unlock()
var result uint64
var found bool
for idx := range d.indicies {
result = idx
found = true
break
}
if found {
delete(d.indicies, result)
return result
}
result = d.nextIndex
d.nextIndex++
return result
}
func (d *dirDispatcher) ReturnIdx(idx uint64) {
d.guard.Lock()
defer d.guard.Unlock()
d.indicies[idx] = struct{}{}
}
func (d *dirDispatcher) Init(idx uint64) {
d.guard.Lock()
defer d.guard.Unlock()
if d.nextIndex <= idx {
d.nextIndex = idx + 1
}
}

View file

@ -0,0 +1,29 @@
package blobtree
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestDispatcher(t *testing.T) {
t.Parallel()
d := newRootDispatcher()
idx := d.GetIdxForWrite("/dir1")
require.Equal(t, uint64(0), idx)
d.ReturnIdx("/dir1", idx)
idx = d.GetIdxForWrite("/dir1")
require.Equal(t, uint64(0), idx)
idx = d.GetIdxForWrite("/dir1")
require.Equal(t, uint64(1), idx)
d.Init("/dir2", 5)
idx = d.GetIdxForWrite("/dir2")
require.Equal(t, uint64(6), idx)
idx = d.GetIdxForWrite("/dir2")
require.Equal(t, uint64(7), idx)
}

View file

@ -0,0 +1,76 @@
package blobtree
import (
"context"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
b.cfg.metrics.Exists(time.Since(startedAt), success, prm.StorageID != nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Exists",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.StorageID)),
))
defer span.End()
var res common.ExistsRes
var err error
if path, ok := getPathFromStorageID(prm.StorageID); ok {
res, err = b.existsFromPath(prm.Address, path)
} else {
res, err = b.findAndCheckExistance(prm.Address)
}
success = err == nil
return res, err
}
func (b *BlobTree) existsFromPath(addr oid.Address, path string) (common.ExistsRes, error) {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
return common.ExistsRes{}, err
}
for i := range records {
if records[i].Address.Equals(addr) {
return common.ExistsRes{
Exists: true,
}, nil
}
}
return common.ExistsRes{}, nil
}
func (b *BlobTree) findAndCheckExistance(addr oid.Address) (common.ExistsRes, error) {
dir := b.getDir(addr)
_, err := b.findFileIdx(dir, addr)
if err == nil {
return common.ExistsRes{Exists: true}, nil
}
var notFound *apistatus.ObjectNotFound
if errors.As(err, &notFound) {
return common.ExistsRes{}, nil
}
return common.ExistsRes{}, err
}

View file

@ -0,0 +1,39 @@
package blobtree
import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
)
func TestGeneric(t *testing.T) {
newTreeFromPath := func(path string) common.Storage {
return New(
WithPath(path),
WithDepth(2))
}
newTree := func(t *testing.T) common.Storage {
return newTreeFromPath(t.TempDir())
}
blobstortest.TestAll(t, newTree, 2048, 16*1024)
t.Run("info", func(t *testing.T) {
path := t.TempDir()
blobstortest.TestInfo(t, func(*testing.T) common.Storage {
return newTreeFromPath(path)
}, Type, path)
})
}
func TestControl(t *testing.T) {
newTree := func(t *testing.T) common.Storage {
return New(
WithPath(t.TempDir()),
WithDepth(2))
}
blobstortest.TestControl(t, newTree, 2048, 2048)
}

View file

@ -0,0 +1,129 @@
package blobtree
import (
"context"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
var (
startedAt = time.Now()
success = false
size = 0
)
defer func() {
b.cfg.metrics.Get(time.Since(startedAt), size, success, prm.StorageID != nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Get",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.StorageID)),
attribute.Bool("raw", prm.Raw),
))
defer span.End()
res, err := b.get(prm)
success = err == nil
size = len(res.RawData)
return res, err
}
func (b *BlobTree) get(prm common.GetPrm) (common.GetRes, error) {
if path, ok := getPathFromStorageID(prm.StorageID); ok {
return b.getFromPath(prm.Address, path)
}
return b.findAndGet(prm.Address)
}
func (b *BlobTree) getFromPath(addr oid.Address, path string) (common.GetRes, error) {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
return common.GetRes{}, err
}
for _, record := range records {
if record.Address.Equals(addr) {
return b.unmarshalGetRes(record)
}
}
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func (b *BlobTree) unmarshalGetRes(record objectData) (common.GetRes, error) {
data, err := b.compressor.Decompress(record.Data)
if err != nil {
return common.GetRes{}, err
}
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
return common.GetRes{}, err
}
return common.GetRes{Object: obj, RawData: data}, nil
}
func (b *BlobTree) findAndGet(addr oid.Address) (common.GetRes, error) {
dir := b.getDir(addr)
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
if err != nil {
if os.IsNotExist(err) {
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
return common.GetRes{}, err
}
for _, entity := range entities {

My suggestion is not the big deal but what do you think about using filepath.WalkDir?

My suggestion is not the big deal but what do you think about using `filepath.WalkDir`?
if entity.IsDir() {
continue
}
if b.isTempFile(entity.Name()) {
continue
}
path := filepath.Join(dir, entity.Name())
res, err := b.tryReadObject(path, addr)
if err != nil {
return common.GetRes{}, err
}
if res.Object != nil {
return res, nil
}
}
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
}
func (b *BlobTree) tryReadObject(path string, addr oid.Address) (common.GetRes, error) {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
return common.GetRes{}, err
}
for _, record := range records {
if record.Address.Equals(addr) {
res, err := b.unmarshalGetRes(record)
if err != nil {
return common.GetRes{}, err
}
return res, nil
}
}
return common.GetRes{}, nil
}

View file

@ -0,0 +1,55 @@
package blobtree
import (
"context"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
var (
startedAt = time.Now()
success = false
size = 0
)
defer func() {
b.cfg.metrics.GetRange(time.Since(startedAt), size, success, prm.StorageID != nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.GetRange",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("storage_id", string(prm.StorageID)),
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
))
defer span.End()
gRes, err := b.get(common.GetPrm{Address: prm.Address, StorageID: prm.StorageID})
if err != nil {
return common.GetRangeRes{}, err
}
payload := gRes.Object.Payload()
from := prm.Range.GetOffset()
to := from + prm.Range.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
}
res := common.GetRangeRes{
Data: payload[from:to],
}
size = len(res.Data)
success = true
return res, nil
}

View file

@ -0,0 +1,97 @@
package blobtree
import (
"context"
"os"
"path/filepath"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
func (b *BlobTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
var (
startedAt = time.Now()
err error
)
defer func() {
b.cfg.metrics.Iterate(time.Since(startedAt), err == nil)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Iterate",
trace.WithAttributes(
attribute.String("path", b.cfg.rootPath),
attribute.Bool("ignore_errors", prm.IgnoreErrors),
))
defer span.End()
err = b.iterateDir("", prm)
return common.IterateRes{}, err
}
func (b *BlobTree) iterateDir(dir string, prm common.IteratePrm) error {
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, entity := range entities {
if entity.IsDir() {
err := b.iterateDir(filepath.Join(dir, entity.Name()), prm)
if err != nil {
return err
}
continue
}
if b.isTempFile(entity.Name()) {
continue
}
path := filepath.Join(dir, entity.Name())
err = b.iterateRecords(path, prm)
if err != nil {
return err
}
}
return nil
}
func (b *BlobTree) iterateRecords(path string, prm common.IteratePrm) error {
b.fileLock.RLock(path)
defer b.fileLock.RUnlock(path)
records, err := b.readFileContent(path)
if err != nil {
if prm.IgnoreErrors {
return nil
}
return err
}
for _, record := range records {
record.Data, err = b.compressor.Decompress(record.Data)
if err != nil {
if prm.IgnoreErrors {
continue
}
return err
}
err = prm.Handler(common.IterationElement{
Address: record.Address,
ObjectData: record.Data,
StorageID: pathToStorageID(path),
})
if err != nil {
return err
}
}
return nil
}

View file

@ -0,0 +1,34 @@
package blobtree
import "time"
type Metrics interface {
SetParentID(parentID string)
SetMode(readOnly bool)
Close()
Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool)
GetRange(d time.Duration, size int, success, withStorageID bool)
Get(d time.Duration, size int, success, withStorageID bool)
Iterate(d time.Duration, success bool)
Put(d time.Duration, size int, success bool)
IncFilesCount()
DecFilesCount()
}
type noopMetrics struct{}
func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(bool) {}
func (m *noopMetrics) Close() {}
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) IncFilesCount() {}
func (m *noopMetrics) DecFilesCount() {}

View file

@ -0,0 +1,35 @@
package blobtree
import "io/fs"
type Option func(*cfg)
func WithPath(path string) Option {
return func(c *cfg) {
c.rootPath = path
}
}
func WithDepth(depth uint64) Option {
return func(c *cfg) {
c.depth = depth
}
}
func WithPerm(p fs.FileMode) Option {
return func(c *cfg) {
c.permissions = p
}
}
func WithTargetSize(size uint64) Option {
return func(c *cfg) {
c.targetFileSizeBytes = size
}
}
func WithMetrics(m Metrics) Option {
return func(c *cfg) {
c.metrics = m
}
}

View file

@ -0,0 +1,121 @@
package blobtree
import (
"context"
"os"
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
tempFileSymbols = "###"
)
func (b *BlobTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
var (
success bool
size int
startedAt = time.Now()
)
defer func() {
b.cfg.metrics.Put(time.Since(startedAt), size, success)
}()
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Put",
trace.WithAttributes(
attribute.String("address", prm.Address.EncodeToString()),
attribute.Bool("dont_compress", prm.DontCompress),
))
defer span.End()
if b.cfg.readOnly {
return common.PutRes{}, common.ErrReadOnly
}
dir := b.getDir(prm.Address)
if err := b.createDir(dir, false); err != nil {
return common.PutRes{}, err
}
if !prm.DontCompress {
prm.RawData = b.compressor.Compress(prm.RawData)
}
path, err := b.saveToLocalDir(prm, dir)
if err != nil {
return common.PutRes{}, err
}
success = true
size = len(prm.RawData)
return common.PutRes{StorageID: pathToStorageID(path)}, nil
}
func (b *BlobTree) saveToLocalDir(prm common.PutPrm, dir string) (string, error) {
returnIdx := true
idx := b.dispatcher.GetIdxForWrite(dir)
path := b.getFilePath(dir, idx)
b.fileLock.Lock(path)
defer b.fileLock.Unlock(path)
defer func() {
if returnIdx {
b.dispatcher.ReturnIdx(dir, idx)
}
}()
currentContent, err := b.readFileContent(path)
if err != nil {
return "", err
}
var newRecord objectData
newRecord.Address = prm.Address
newRecord.Data = prm.RawData
size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path)
if err != nil {
return "", err
}
returnIdx = size < b.cfg.targetFileSizeBytes
return path, nil
}
func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) {
tmpPath := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16)
size, err := b.saveContentToFile(records, tmpPath)
if err != nil {
_ = os.Remove(b.getSystemPath(tmpPath))
return 0, err
}
newFile := false
_, err = os.Stat(b.getSystemPath(path))
if err != nil {
if os.IsNotExist(err) {
newFile = true
} else {
return 0, err
}
}
if err := os.Rename(b.getSystemPath(tmpPath), b.getSystemPath(path)); err != nil {
_ = os.Remove(b.getSystemPath(tmpPath))
return 0, err
}
if newFile {
b.cfg.metrics.IncFilesCount()
}
return size, nil
}

View file

@ -0,0 +1,15 @@
package blobtree
import (
"strings"
)
const storageIDPrefix = "blobtree:"
func getPathFromStorageID(storageID []byte) (string, bool) {
return strings.CutPrefix(string(storageID), storageIDPrefix)
}
func pathToStorageID(path string) []byte {
return []byte(storageIDPrefix + path)
}

View file

@ -5,7 +5,9 @@ import (
"fmt"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
@ -81,6 +83,23 @@ var storages = []storage{
)
},
},
{
desc: "blobtree",
create: func(dir string) common.Storage {
return blobtree.New(
blobtree.WithDepth(2),
blobtree.WithPath(dir),
)
},
},
{
desc: "badger",
create: func(dir string) common.Storage {
return badgerstore.New(
badgerstore.WithPath(dir),
)
},
},
}
func BenchmarkSubstorageReadPerf(b *testing.B) {

View file

@ -0,0 +1,74 @@
package metrics
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
)
func NewBadgerStoreMetrics(path string, m metrics_impl.BadgerStoreMetrics) badgerstore.Metrics {
return &badgerStoreMetrics{
path: path,
m: m,
}
}
type badgerStoreMetrics struct {
path, shardID string
m metrics_impl.BadgerStoreMetrics
}
// Close implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Close() {
m.m.Close(m.shardID, m.path)
}
// Delete implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Delete(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Delete", d, success)
}
// Exists implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Exists(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Exists", d, success)
}
// Get implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Get(d time.Duration, size int, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Get", d, success)
if success {
m.m.AddGet(m.shardID, m.path, size)
}
}
// GetRange implements badgerstore.Metrics.
func (m *badgerStoreMetrics) GetRange(d time.Duration, size int, success bool) {
m.m.MethodDuration(m.shardID, m.path, "GetRange", d, success)
if success {
m.m.AddGet(m.shardID, m.path, size)
}
}
// Iterate implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Iterate(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success)
}
// Put implements badgerstore.Metrics.
func (m *badgerStoreMetrics) Put(d time.Duration, size int, success bool) {
m.m.MethodDuration(m.shardID, m.path, "Put", d, success)
if success {
m.m.AddPut(m.shardID, m.path, size)
}
}
// SetMode implements badgerstore.Metrics.
func (m *badgerStoreMetrics) SetMode(readOnly bool) {
m.m.SetMode(m.shardID, m.path, readOnly)
}
// SetParentID implements badgerstore.Metrics.
func (m *badgerStoreMetrics) SetParentID(parentID string) {
m.shardID = parentID
}

View file

@ -8,7 +8,7 @@ import (
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
)
func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics) blobovniczatree.Metrics {
func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobovniczaMetrics) blobovniczatree.Metrics {
return &blobovniczaTreeMetrics{
path: path,
shardID: undefined,
@ -19,7 +19,7 @@ func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics)
type blobovniczaTreeMetrics struct {
shardID string
path string
m metrics_impl.BlobobvnizcaMetrics
m metrics_impl.BlobovniczaMetrics
}
func (m *blobovniczaTreeMetrics) Blobovnicza() blobovnicza.Metrics {
@ -35,48 +35,48 @@ func (m *blobovniczaTreeMetrics) SetParentID(parentID string) {
}
func (m *blobovniczaTreeMetrics) SetMode(readOnly bool) {
m.m.SetBlobobvnizcaTreeMode(m.shardID, m.path, readOnly)
m.m.SetBlobovniczaTreeMode(m.shardID, m.path, readOnly)
}
func (m *blobovniczaTreeMetrics) Close() {
m.m.CloseBlobobvnizcaTree(m.shardID, m.path)
m.m.CloseBlobovniczaTree(m.shardID, m.path)
}
func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}
func (m *blobovniczaTreeMetrics) Exists(d time.Duration, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}
func (m *blobovniczaTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
if success {
m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size)
m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size)
}
}
func (m *blobovniczaTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
if success {
m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size)
m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size)
}
}
func (m *blobovniczaTreeMetrics) Iterate(d time.Duration, success bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
}
func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
if success {
m.m.AddBlobobvnizcaTreePut(m.shardID, m.path, size)
m.m.AddBlobovniczaTreePut(m.shardID, m.path, size)
}
}
type blobovniczaMetrics struct {
m metrics_impl.BlobobvnizcaMetrics
m metrics_impl.BlobovniczaMetrics
shardID func() string
path string
}

View file

@ -0,0 +1,74 @@
package metrics
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
)
func NewBlobTreeMetrics(path string, m metrics_impl.BlobTreeMetrics) blobtree.Metrics {
return &blobTreeMetrics{
path: path,
m: m,
}
}
type blobTreeMetrics struct {
shardID string
path string
m metrics_impl.BlobTreeMetrics
}
func (m *blobTreeMetrics) SetParentID(parentID string) {
m.shardID = parentID
}
func (m *blobTreeMetrics) SetMode(readOnly bool) {
m.m.SetBlobTreeMode(m.shardID, m.path, readOnly)
}
func (m *blobTreeMetrics) Close() {
m.m.CloseBlobTree(m.shardID, m.path)
}
func (m *blobTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}
func (m *blobTreeMetrics) Exists(d time.Duration, success, withStorageID bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}
func (m *blobTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
if success {
m.m.AddBlobTreeGet(m.shardID, m.path, size)
}
}
func (m *blobTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
if success {
m.m.AddBlobTreeGet(m.shardID, m.path, size)
}
}
func (m *blobTreeMetrics) Iterate(d time.Duration, success bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
}
func (m *blobTreeMetrics) Put(d time.Duration, size int, success bool) {
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
if success {
m.m.AddBlobTreePut(m.shardID, m.path, size)
}
}
func (m *blobTreeMetrics) IncFilesCount() {
m.m.IncBlobTreeFilesCount(m.shardID, m.path)
}
func (m *blobTreeMetrics) DecFilesCount() {
m.m.DecBlobTreeFilesCount(m.shardID, m.path)
}

View file

@ -0,0 +1,98 @@
package metrics
import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type BadgerStoreMetrics interface {
SetMode(shardID, path string, readOnly bool)
Close(shardID, path string)
MethodDuration(shardID, path string, method string, d time.Duration, success bool)
AddPut(shardID, path string, size int)
AddGet(shardID, path string, size int)
}
var _ BadgerStoreMetrics = (*badgerStoreMetrics)(nil)
type badgerStoreMetrics struct {
mode *shardIDPathModeValue
reqDuration *prometheus.HistogramVec
put *prometheus.CounterVec
get *prometheus.CounterVec
}
func newbadgerStoreMetrics() *badgerStoreMetrics {
return &badgerStoreMetrics{
mode: newShardIDPathMode(badgerStoreSubSystem, "mode", "BadgerStore mode"),
reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: badgerStoreSubSystem,
Name: "request_duration_seconds",
Help: "Accumulated BadgerStore request process duration",
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel}),
put: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: badgerStoreSubSystem,
Name: "put_bytes",
Help: "Accumulated payload size written to BadgerStore",
}, []string{shardIDLabel, pathLabel}),
get: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: badgerStoreSubSystem,
Name: "get_bytes",
Help: "Accumulated payload size read from BadgerStore",
}, []string{shardIDLabel, pathLabel}),
}
}
// AddGet implements BadgerStoreMetrics.
func (b *badgerStoreMetrics) AddGet(shardID string, path string, size int) {
b.get.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Add(float64(size))
}
// AddPut implements BadgerStoreMetrics.
func (b *badgerStoreMetrics) AddPut(shardID string, path string, size int) {
b.put.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Add(float64(size))
}
// Close implements BadgerStoreMetrics.
func (b *badgerStoreMetrics) Close(shardID string, path string) {
b.mode.SetMode(shardID, path, closedMode)
b.reqDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.get.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.put.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
}
// MethodDuration implements BadgerStoreMetrics.
func (b *badgerStoreMetrics) MethodDuration(shardID string, path string, method string, d time.Duration, success bool) {
b.reqDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
methodLabel: method,
}).Observe(d.Seconds())
}
// SetMode implements BadgerStoreMetrics.
func (b *badgerStoreMetrics) SetMode(shardID string, path string, readOnly bool) {
b.mode.SetMode(shardID, path, modeFromBool(readOnly))
}

View file

@ -8,12 +8,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
type BlobobvnizcaMetrics interface {
SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool)
CloseBlobobvnizcaTree(shardID, path string)
BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
AddBlobobvnizcaTreePut(shardID, path string, size int)
AddBlobobvnizcaTreeGet(shardID, path string, size int)
type BlobovniczaMetrics interface {
SetBlobovniczaTreeMode(shardID, path string, readOnly bool)
CloseBlobovniczaTree(shardID, path string)
BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
AddBlobovniczaTreePut(shardID, path string, size int)
AddBlobovniczaTreeGet(shardID, path string, size int)
AddOpenBlobovniczaSize(shardID, path string, size uint64)
SubOpenBlobovniczaSize(shardID, path string, size uint64)
@ -78,11 +78,11 @@ func newBlobovnicza() *blobovnicza {
}
}
func (b *blobovnicza) SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool) {
func (b *blobovnicza) SetBlobovniczaTreeMode(shardID, path string, readOnly bool) {
b.treeMode.SetMode(shardID, path, modeFromBool(readOnly))
}
func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
func (b *blobovnicza) CloseBlobovniczaTree(shardID, path string) {
b.treeMode.SetMode(shardID, path, closedMode)
b.treeReqDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
@ -96,9 +96,21 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
shardIDLabel: shardID,
pathLabel: path,
})
b.treeOpenSize.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeOpenItems.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeOpenCounter.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
}
func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
func (b *blobovnicza) BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
b.treeReqDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
@ -108,14 +120,14 @@ func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, metho
}).Observe(d.Seconds())
}
func (b *blobovnicza) AddBlobobvnizcaTreePut(shardID, path string, size int) {
func (b *blobovnicza) AddBlobovniczaTreePut(shardID, path string, size int) {
b.treePut.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Add(float64(size))
}
func (b *blobovnicza) AddBlobobvnizcaTreeGet(shardID, path string, size int) {
func (b *blobovnicza) AddBlobovniczaTreeGet(shardID, path string, size int) {
b.treeGet.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,

120
pkg/metrics/blobtree.go Normal file
View file

@ -0,0 +1,120 @@
package metrics
import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
"github.com/prometheus/client_golang/prometheus"
)
type BlobTreeMetrics interface {
SetBlobTreeMode(shardID, path string, readOnly bool)
CloseBlobTree(shardID, path string)
BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
IncBlobTreeFilesCount(shardID, path string)
DecBlobTreeFilesCount(shardID, path string)
AddBlobTreePut(shardID, path string, size int)
AddBlobTreeGet(shardID, path string, size int)
}
type blobTreeMetrics struct {
mode *shardIDPathModeValue
reqDuration *prometheus.HistogramVec
put *prometheus.CounterVec
get *prometheus.CounterVec
filesCount *prometheus.GaugeVec
}
func newBlobTreeMetrics() *blobTreeMetrics {
return &blobTreeMetrics{
mode: newShardIDPathMode(blobTreeSubSystem, "mode", "Blob tree mode"),
reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blobTreeSubSystem,
Name: "request_duration_seconds",
Help: "Accumulated Blob tree request process duration",
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}),
put: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: blobTreeSubSystem,
Name: "put_bytes",
Help: "Accumulated payload size written to Blob tree",
}, []string{shardIDLabel, pathLabel}),
get: metrics.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: blobTreeSubSystem,
Name: "get_bytes",
Help: "Accumulated payload size read from Blob tree",
}, []string{shardIDLabel, pathLabel}),
filesCount: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blobTreeSubSystem,
Name: "files_count",
Help: "Count of data files in Blob tree",
}, []string{shardIDLabel, pathLabel}),
}
}
func (b *blobTreeMetrics) SetBlobTreeMode(shardID, path string, readOnly bool) {
b.mode.SetMode(shardID, path, modeFromBool(readOnly))
}
func (b *blobTreeMetrics) CloseBlobTree(shardID, path string) {
b.mode.SetMode(shardID, path, closedMode)
b.reqDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.get.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.put.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.filesCount.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
}
func (b *blobTreeMetrics) BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
b.reqDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
methodLabel: method,
withStorageIDLabel: withStorageID.String(),
}).Observe(d.Seconds())
}
func (b *blobTreeMetrics) IncBlobTreeFilesCount(shardID, path string) {
b.filesCount.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Inc()
}
func (b *blobTreeMetrics) DecBlobTreeFilesCount(shardID, path string) {
b.filesCount.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Dec()
}
func (b *blobTreeMetrics) AddBlobTreePut(shardID, path string, size int) {
b.put.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Add(float64(size))
}
func (b *blobTreeMetrics) AddBlobTreeGet(shardID, path string, size int) {
b.get.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Add(float64(size))
}

View file

@ -7,6 +7,7 @@ const (
fstreeSubSystem = "fstree"
blobstoreSubSystem = "blobstore"
blobovniczaTreeSubSystem = "blobovnicza_tree"
blobTreeSubSystem = "blobtree"
metabaseSubSystem = "metabase"
piloramaSubSystem = "pilorama"
engineSubsystem = "engine"
@ -21,6 +22,7 @@ const (
writeCacheSubsystem = "writecache"
grpcServerSubsystem = "grpc_server"
policerSubsystem = "policer"
badgerStoreSubSystem = "badgerstore"
successLabel = "success"
shardIDLabel = "shard_id"

View file

@ -16,10 +16,12 @@ type NodeMetrics struct {
epoch prometheus.Gauge
fstree *fstreeMetrics
blobstore *blobstoreMetrics
blobobvnizca *blobovnicza
blobovnicza *blobovnicza
metabase *metabaseMetrics
pilorama *piloramaMetrics
grpc *grpcServerMetrics
blobTree *blobTreeMetrics
badgerStore *badgerStoreMetrics
policer *policerMetrics
morphClient *morphClientMetrics
morphCache *morphCacheMetrics
@ -39,16 +41,18 @@ func NewNodeMetrics() *NodeMetrics {
Name: "epoch",
Help: "Current epoch as seen by inner-ring node.",
}),
fstree: newFSTreeMetrics(),
blobstore: newBlobstoreMetrics(),
blobobvnizca: newBlobovnicza(),
metabase: newMetabaseMetrics(),
pilorama: newPiloramaMetrics(),
grpc: newGrpcServerMetrics(),
policer: newPolicerMetrics(),
morphClient: newMorphClientMetrics(),
morphCache: newMorphCacheMetrics(namespace),
log: logger.NewLogMetrics(namespace),
fstree: newFSTreeMetrics(),
blobstore: newBlobstoreMetrics(),
blobovnicza: newBlobovnicza(),
metabase: newMetabaseMetrics(),
pilorama: newPiloramaMetrics(),
grpc: newGrpcServerMetrics(),
policer: newPolicerMetrics(),
morphClient: newMorphClientMetrics(),
morphCache: newMorphCacheMetrics(namespace),
log: logger.NewLogMetrics(namespace),
blobTree: newBlobTreeMetrics(),
badgerStore: newbadgerStoreMetrics(),
}
}
@ -85,8 +89,8 @@ func (m *NodeMetrics) Blobstore() BlobstoreMetrics {
return m.blobstore
}
func (m *NodeMetrics) BlobobvnizcaTreeMetrics() BlobobvnizcaMetrics {
return m.blobobvnizca
func (m *NodeMetrics) BlobovniczaTreeMetrics() BlobovniczaMetrics {
return m.blobovnicza
}
func (m *NodeMetrics) MetabaseMetrics() MetabaseMetrics {
@ -116,3 +120,11 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
return m.log
}
func (m *NodeMetrics) BlobTreeMetrics() BlobTreeMetrics {
return m.blobTree
}
func (m *NodeMetrics) BadgerStoreMetrics() BadgerStoreMetrics {
return m.badgerStore
}

View file

@ -3,8 +3,8 @@ package sync
import "sync"
type locker struct {
mtx sync.Mutex
waiters int // not protected by mtx, must used outer mutex to update concurrently
mtx sync.RWMutex
userCount int // not protected by mtx, must used outer mutex to update concurrently
}
type KeyLocker[K comparable] struct {
@ -19,26 +19,50 @@ func NewKeyLocker[K comparable]() *KeyLocker[K] {
}
func (l *KeyLocker[K]) Lock(key K) {
l.lock(key, false)
}
func (l *KeyLocker[K]) RLock(key K) {
l.lock(key, true)
}
func (l *KeyLocker[K]) lock(key K, read bool) {
l.lockersMtx.Lock()
if locker, found := l.lockers[key]; found {
locker.waiters++
locker.userCount++
l.lockersMtx.Unlock()
locker.mtx.Lock()
if read {
locker.mtx.RLock()
} else {
locker.mtx.Lock()
}
return
}
locker := &locker{
waiters: 1,
userCount: 1,
}
if read {
locker.mtx.RLock()
} else {
locker.mtx.Lock()
}
locker.mtx.Lock()
l.lockers[key] = locker
l.lockersMtx.Unlock()
}
func (l *KeyLocker[K]) Unlock(key K) {
l.unlock(key, false)
}
func (l *KeyLocker[K]) RUnlock(key K) {
l.unlock(key, true)
}
func (l *KeyLocker[K]) unlock(key K, read bool) {
l.lockersMtx.Lock()
defer l.lockersMtx.Unlock()
@ -47,10 +71,14 @@ func (l *KeyLocker[K]) Unlock(key K) {
return
}
if locker.waiters == 1 {
if locker.userCount == 1 {
delete(l.lockers, key)
}
locker.waiters--
locker.userCount--
locker.mtx.Unlock()
if read {
locker.mtx.RUnlock()
} else {
locker.mtx.Unlock()
}
}

View file

@ -9,7 +9,7 @@ import (
"golang.org/x/sync/errgroup"
)
func TestKeyLocker(t *testing.T) {
func TestKeyLockerWrite(t *testing.T) {
taken := false
eg, _ := errgroup.WithContext(context.Background())
keyLocker := NewKeyLocker[int]()
@ -30,3 +30,17 @@ func TestKeyLocker(t *testing.T) {
}
require.NoError(t, eg.Wait())
}
func TestKeyLockerRead(t *testing.T) {
eg, _ := errgroup.WithContext(context.Background())
keyLocker := NewKeyLocker[int]()
for i := 0; i < 100; i++ {
eg.Go(func() error {
keyLocker.RLock(0)
defer keyLocker.RUnlock(0)
return nil
})
}
require.NoError(t, eg.Wait())
}