diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index cc106cf95..c52c44e15 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -21,7 +21,9 @@ import ( contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" + badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" + blobtreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobtree" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" @@ -32,7 +34,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" @@ -181,6 +185,14 @@ type subStorageCfg struct { width uint64 leafWidth uint64 openedCacheSize int + + // badgerstore-specific + indexCacheSize int64 + memTablesCount int + compactorsCount int + gcInterval time.Duration + gcDiscardRatio float64 + valueLogFileSize int64 } // readConfig fills applicationConfiguration with raw configuration values @@ -302,6 +314,18 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() + case blobtree.Type: + sub := blobtreeconfig.From((*config.Config)(storagesCfg[i])) + sCfg.depth = sub.Depth() + sCfg.size = sub.Size() + case badgerstore.Type: + sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i])) + sCfg.indexCacheSize = sub.IndexCacheSize() + sCfg.memTablesCount = sub.MemTablesCount() + sCfg.compactorsCount = sub.CompactorsCount() + sCfg.gcInterval = sub.GCInterval() + sCfg.gcDiscardRatio = sub.GCDiscardRatio() + sCfg.valueLogFileSize = sub.ValueLogFileSize() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } @@ -788,52 +812,37 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { for _, sRead := range shCfg.subStorages { switch sRead.typ { case blobovniczatree.Type: - blobTreeOpts := []blobovniczatree.Option{ - blobovniczatree.WithRootPath(sRead.path), - blobovniczatree.WithPermissions(sRead.perm), - blobovniczatree.WithBlobovniczaSize(sRead.size), - blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), - blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), - blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), - blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), - blobovniczatree.WithLogger(c.log), - } - - if c.metricsCollector != nil { - blobTreeOpts = append(blobTreeOpts, - blobovniczatree.WithMetrics( - lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()), - ), - ) - } + blobovniczaTreeOpts := c.getBlobovniczaTreeOpts(sRead) ss = append(ss, blobstor.SubStorage{ - Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...), + Storage: blobovniczatree.NewBlobovniczaTree(blobovniczaTreeOpts...), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, }) case fstree.Type: - fstreeOpts := []fstree.Option{ - fstree.WithPath(sRead.path), - fstree.WithPerm(sRead.perm), - fstree.WithDepth(sRead.depth), - fstree.WithNoSync(sRead.noSync), - fstree.WithLogger(c.log), - } - if c.metricsCollector != nil { - fstreeOpts = append(fstreeOpts, - fstree.WithMetrics( - lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()), - ), - ) - } - + fstreeOpts := c.getFSTreeOpts(sRead) ss = append(ss, blobstor.SubStorage{ Storage: fstree.New(fstreeOpts...), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, }) + case blobtree.Type: + blobTreeOpts := c.getBlobTreeOpts(sRead) + ss = append(ss, blobstor.SubStorage{ + Storage: blobtree.New(blobTreeOpts...), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) + case badgerstore.Type: + badgerStoreOpts := c.getBadgerStoreOpts(sRead) + ss = append(ss, blobstor.SubStorage{ + Storage: badgerstore.New(badgerStoreOpts...), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) default: // should never happen, that has already // been handled: when the config was read @@ -842,6 +851,82 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { return ss } +func (c *cfg) getBadgerStoreOpts(sRead subStorageCfg) []badgerstore.Option { + badgerStoreOpts := []badgerstore.Option{ + badgerstore.WithPath(sRead.path), + badgerstore.WithPermissions(sRead.perm), + badgerstore.WithCompactorsCount(sRead.compactorsCount), + badgerstore.WithGCDiscardRatio(sRead.gcDiscardRatio), + badgerstore.WithGCInterval(sRead.gcInterval), + badgerstore.WithIndexCacheSize(sRead.indexCacheSize), + badgerstore.WithMemTablesCount(sRead.memTablesCount), + badgerstore.WithValueLogSize(sRead.valueLogFileSize), + } + if c.metricsCollector != nil { + badgerStoreOpts = append(badgerStoreOpts, + badgerstore.WithMetrics( + lsmetrics.NewBadgerStoreMetrics(sRead.path, c.metricsCollector.BadgerStoreMetrics()))) + } + return badgerStoreOpts +} + +func (c *cfg) getBlobTreeOpts(sRead subStorageCfg) []blobtree.Option { + blobTreeOpts := []blobtree.Option{ + blobtree.WithPath(sRead.path), + blobtree.WithPerm(sRead.perm), + blobtree.WithDepth(sRead.depth), + blobtree.WithTargetSize(sRead.size), + } + if c.metricsCollector != nil { + blobTreeOpts = append(blobTreeOpts, + blobtree.WithMetrics( + lsmetrics.NewBlobTreeMetrics(sRead.path, c.metricsCollector.BlobTreeMetrics()), + ), + ) + } + return blobTreeOpts +} + +func (c *cfg) getFSTreeOpts(sRead subStorageCfg) []fstree.Option { + fstreeOpts := []fstree.Option{ + fstree.WithPath(sRead.path), + fstree.WithPerm(sRead.perm), + fstree.WithDepth(sRead.depth), + fstree.WithNoSync(sRead.noSync), + fstree.WithLogger(c.log), + } + if c.metricsCollector != nil { + fstreeOpts = append(fstreeOpts, + fstree.WithMetrics( + lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()), + ), + ) + } + return fstreeOpts +} + +func (c *cfg) getBlobovniczaTreeOpts(sRead subStorageCfg) []blobovniczatree.Option { + blobTreeOpts := []blobovniczatree.Option{ + blobovniczatree.WithRootPath(sRead.path), + blobovniczatree.WithPermissions(sRead.perm), + blobovniczatree.WithBlobovniczaSize(sRead.size), + blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), + blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), + blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), + blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), + blobovniczatree.WithLogger(c.log), + } + + if c.metricsCollector != nil { + blobTreeOpts = append(blobTreeOpts, + blobovniczatree.WithMetrics( + lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobovniczaTreeMetrics()), + ), + ) + } + return blobTreeOpts +} + func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { writeCacheOpts := c.getWriteCacheOpts(shCfg) piloramaOpts := c.getPiloramaOpts(shCfg) diff --git a/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go b/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go new file mode 100644 index 000000000..fe8f4f629 --- /dev/null +++ b/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore/config.go @@ -0,0 +1,85 @@ +package badgerstore + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" +) + +type Config config.Config + +const ( + IndexCacheSizeDefault = 256 << 20 //256MB + MemTablesCountDefault = 32 + CompactorsCountDefault = 64 + GCIntervalDefault = 10 * time.Minute + GCDiscardRatioDefault = 0.2 + ValueLogSizeDefault = 1 << 30 //1GB +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// Type returns the storage type. +func (x *Config) Type() string { + return badgerstore.Type +} + +// IndexCacheSize returns `index_cache_size` value or IndexCacheSizeDefault. +func (x *Config) IndexCacheSize() int64 { + s := config.SizeInBytesSafe((*config.Config)(x), "index_cache_size") + if s > 0 { + return int64(s) + } + + return IndexCacheSizeDefault +} + +// MemTablesCount returns `mem_tables_count` value or MemTablesCountDefault. +func (x *Config) MemTablesCount() int { + v := config.IntSafe((*config.Config)(x), "mem_tables_count") + if v > 0 { + return int(v) + } + return MemTablesCountDefault +} + +// CompactorsCount returns `mem_tables_count` value or CompactorsCountDefault. +func (x *Config) CompactorsCount() int { + v := config.IntSafe((*config.Config)(x), "compactors_count") + if v > 0 { + return int(v) + } + return CompactorsCountDefault +} + +// GCInterval returns `gc_interval` value or GCIntervalDefault. +func (x *Config) GCInterval() time.Duration { + v := config.DurationSafe((*config.Config)(x), "gc_interval") + if v > 0 { + return v + } + return GCIntervalDefault +} + +// GCDiscardRatio returns `gc_discard_percent` value as ratio value (in range (0.0; 1.0)) or GCDiscardRatioDefault. +func (x *Config) GCDiscardRatio() float64 { + v := config.Uint32Safe((*config.Config)(x), "gc_discard_percent") + if v > 0 && v < 100 { + return float64(v) / (float64(100)) + } + return GCDiscardRatioDefault +} + +// ValueLogFileSize returns `value_log_file_size` value or ValueLogSizeDefault. +func (x *Config) ValueLogFileSize() int64 { + s := config.SizeInBytesSafe((*config.Config)(x), "value_log_file_size") + if s > 0 { + return int64(s) + } + + return ValueLogSizeDefault +} diff --git a/cmd/frostfs-node/config/engine/shard/blobstor/blobtree/config.go b/cmd/frostfs-node/config/engine/shard/blobstor/blobtree/config.go new file mode 100644 index 000000000..47b653a45 --- /dev/null +++ b/cmd/frostfs-node/config/engine/shard/blobstor/blobtree/config.go @@ -0,0 +1,60 @@ +package blobtreeconfig + +import ( + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" +) + +// Config is a wrapper over the config section +// which provides access to Blobtree configurations. +type Config config.Config + +const ( + // SizeDefault is a default limit of estimates of single Blobtree file size. + SizeDefault = 4 * 1024 * 1024 + + // DepthDefault is a default shallow dir depth. + DepthDefault = 8 +) + +// From wraps config section into Config. +func From(c *config.Config) *Config { + return (*Config)(c) +} + +// Type returns the storage type. +func (x *Config) Type() string { + return blobtree.Type +} + +// Size returns the value of "size" config parameter. +// +// Returns SizeDefault if the value is not a positive number. +func (x *Config) Size() uint64 { + s := config.SizeInBytesSafe( + (*config.Config)(x), + "size", + ) + + if s > 0 { + return s + } + + return SizeDefault +} + +// ShallowDepth returns the value of "depth" config parameter. +// +// Returns ShallowDepthDefault if the value is not a positive number. +func (x *Config) Depth() uint64 { + d := config.UintSafe( + (*config.Config)(x), + "depth", + ) + + if d > 0 { + return d + } + + return DepthDefault +} diff --git a/cmd/frostfs-node/validate.go b/cmd/frostfs-node/validate.go index 80c90ec44..60690d2a6 100644 --- a/cmd/frostfs-node/validate.go +++ b/cmd/frostfs-node/validate.go @@ -9,7 +9,9 @@ import ( shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" ) @@ -55,7 +57,7 @@ func validateConfig(c *config.Config) error { } for i := range blobstor { switch blobstor[i].Type() { - case fstree.Type, blobovniczatree.Type: + case fstree.Type, blobovniczatree.Type, blobtree.Type, badgerstore.Type: default: return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum) } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index e8472357c..7a7fc1a98 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -515,5 +515,6 @@ const ( RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" FailedToCountWritecacheItems = "failed to count writecache items" AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" + BadgerStoreGCFailed = "failed to run GC on badgerstore" FailedToGetContainerCounters = "failed to get container counters values" ) diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go index 8d701ae5c..7fe6a7657 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza_test.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza_test.go @@ -97,7 +97,7 @@ func TestBlobovnicza(t *testing.T) { testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil) } - // blobovnizca accepts object event if full + // blobovnicza accepts object event if full testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool { return err == nil }, nil) diff --git a/pkg/local_object_storage/blobstor/badgerstore/config.go b/pkg/local_object_storage/blobstor/badgerstore/config.go new file mode 100644 index 000000000..9156f63f2 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/config.go @@ -0,0 +1,136 @@ +package badgerstore + +import ( + "io/fs" + "math" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "github.com/dgraph-io/badger/v4" + "github.com/dgraph-io/badger/v4/options" + "go.uber.org/zap" +) + +type cfg struct { + permissions fs.FileMode + compression *compression.Config + db badger.Options + gcTimeout time.Duration + gcDiscardRatio float64 + metrics Metrics + logger *logger.Logger +} + +type Option func(*cfg) + +// defaultCfg creates default options to create Store. +// Default Badger options: +// BaseTableSize: 2MB +// BaseLevelSize: 10MB +// TableSizeMultiplier: 2 +// LevelSizeMultiplier: 10 +// MaxLevels: 7 +// NumLevelZeroTables: 5 +// ValueLogFileSize: 1GB +// +// Badger flushes MemTable directly to Level0. +// So for Level0 MemTableSize is used as TableSize https://github.com/dgraph-io/badger/blob/v4.1.0/levels.go#L403. +// There is no total size limit for Level0, only NumLevelZeroTables +// +// Badger uses Dynamic Level Sizes like RocksDB. +// See https://github.com/facebook/rocksdb/blob/v3.11/include/rocksdb/options.h#L366 for explanation. +func defaultCfg() *cfg { + opts := badger.DefaultOptions("/") + opts.BlockCacheSize = 0 // compression and encryption are disabled, so block cache should be disabled + opts.IndexCacheSize = 256 << 20 // 256MB, to not to keep all indicies in memory + opts.Compression = options.None // performed by cfg.compressor + opts.Logger = nil + opts.MetricsEnabled = false + opts.NumLevelZeroTablesStall = math.MaxInt // to not to stall because of Level0 slow compaction + opts.NumMemtables = 32 // default memtable size is 64MB, so max memory consumption will be 2GB before stall + opts.NumCompactors = 64 + opts.SyncWrites = true + opts.ValueLogMaxEntries = math.MaxUint32 // default vLog file size is 1GB, so size is more clear than entries count + opts.ValueThreshold = 0 // to store all values in vLog + + return &cfg{ + permissions: 0o700, + db: opts, + gcTimeout: 10 * time.Minute, + gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free + metrics: &noopMetrics{}, + logger: &logger.Logger{Logger: zap.L()}, + } +} + +// WithPath sets BadgerStore directory. +func WithPath(dir string) Option { + return func(c *cfg) { + c.db.Dir = dir + c.db.ValueDir = dir + } +} + +// WithPermissions sets persmission flags. +func WithPermissions(p fs.FileMode) Option { + return func(c *cfg) { + c.permissions = p + } +} + +// WithIndexCacheSize sets BadgerStore index cache size. +func WithIndexCacheSize(sz int64) Option { + return func(c *cfg) { + c.db.IndexCacheSize = sz + } +} + +// WithMemTablesCount sets maximum count of memtables. +func WithMemTablesCount(count int) Option { + return func(c *cfg) { + c.db.NumMemtables = count + } +} + +// WithCompactorsCount sets count of concurrent compactors. +func WithCompactorsCount(count int) Option { + return func(c *cfg) { + c.db.NumCompactors = count + } +} + +// WithGCInterval sets GC interval value. +func WithGCInterval(d time.Duration) Option { + return func(c *cfg) { + c.gcTimeout = d + } +} + +// WithGCDiscardRatio sets GC discard ratio. +func WithGCDiscardRatio(r float64) Option { + return func(c *cfg) { + c.gcDiscardRatio = r + } +} + +// WithValueLogSize sets max value log size. +func WithValueLogSize(sz int64) Option { + return func(c *cfg) { + c.db.ValueLogFileSize = sz + } +} + +// WithMetrics sets metrics. +func WithMetrics(m Metrics) Option { + return func(c *cfg) { + c.metrics = m + } +} + +// WithLogger sets logger. +func WithLogger(l *logger.Logger) Option { + return func(c *cfg) { + c.logger = l + } +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/control.go b/pkg/local_object_storage/blobstor/badgerstore/control.go new file mode 100644 index 000000000..321094080 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/control.go @@ -0,0 +1,97 @@ +package badgerstore + +import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + "github.com/dgraph-io/badger/v4" + "go.uber.org/zap" +) + +// Close implements common.Storage. +func (s *Store) Close() error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if !s.opened { + return nil + } + + if s.gcCancel != nil { + s.gcCancel() + } + s.wg.Wait() + + if err := s.db.Close(); err != nil { + return err + } + s.opened = false + s.cfg.metrics.Close() + return nil +} + +// Init implements common.Storage. +func (s *Store) Init() error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if !s.opened { + return fmt.Errorf("store must be opened before initialization") + } + + s.startGC() + + return nil +} + +func (s *Store) startGC() { + ctx, cancel := context.WithCancel(context.Background()) + s.gcCancel = cancel + + t := time.NewTicker(s.cfg.gcTimeout) + s.wg.Add(1) + + go func() { + defer s.wg.Done() + + select { + case <-ctx.Done(): + return + case <-t.C: + if err := s.db.RunValueLogGC(s.cfg.gcDiscardRatio); err == nil { + _ = s.db.RunValueLogGC(s.cfg.gcDiscardRatio) // see https://dgraph.io/docs/badger/get-started/#garbage-collection + } else { + s.cfg.logger.Error(logs.BadgerStoreGCFailed, zap.Error(err), zap.String("path", s.cfg.db.Dir)) + } + } + }() +} + +// Open implements common.Storage. +func (s *Store) Open(readOnly bool) error { + s.modeMtx.Lock() + defer s.modeMtx.Unlock() + + if s.opened { + return nil + } + + err := util.MkdirAllX(s.cfg.db.Dir, s.cfg.permissions) + if err != nil { + return err + } + s.cfg.db.ReadOnly = readOnly + if s.db, err = badger.Open(s.cfg.db); err != nil { + return err + } + s.opened = true + s.cfg.metrics.SetMode(readOnly) + return nil +} + +func (s *Store) readOnly() bool { + return s.cfg.db.ReadOnly +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/delete.go b/pkg/local_object_storage/blobstor/badgerstore/delete.go new file mode 100644 index 000000000..34fcf986f --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/delete.go @@ -0,0 +1,55 @@ +package badgerstore + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Delete implements common.Storage. +func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) { + success := false + startedAt := time.Now() + + defer func() { + s.cfg.metrics.Delete(time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + )) + defer span.End() + + if s.readOnly() { + return common.DeleteRes{}, common.ErrReadOnly + } + + tx := s.db.NewTransaction(true) + defer tx.Discard() + + _, err := tx.Get(key(prm.Address)) + if err != nil { + if err == badger.ErrKeyNotFound { + return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return common.DeleteRes{}, err + } + + if err = tx.Delete(key(prm.Address)); err != nil { + return common.DeleteRes{}, err + } + if err = tx.Commit(); err != nil { + return common.DeleteRes{}, err + } + success = true + return common.DeleteRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/exists.go b/pkg/local_object_storage/blobstor/badgerstore/exists.go new file mode 100644 index 000000000..12f85801f --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/exists.go @@ -0,0 +1,46 @@ +package badgerstore + +import ( + "context" + "encoding/hex" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Exists implements common.Storage. +func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + success := false + startedAt := time.Now() + + defer func() { + s.cfg.metrics.Exists(time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + )) + defer span.End() + + tx := s.db.NewTransaction(false) + defer tx.Discard() + + _, err := tx.Get(key(prm.Address)) + if err != nil { + if err == badger.ErrKeyNotFound { + success = true + return common.ExistsRes{Exists: false}, nil + } + return common.ExistsRes{}, err + } + + success = true + return common.ExistsRes{Exists: true}, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/generic_test.go b/pkg/local_object_storage/blobstor/badgerstore/generic_test.go new file mode 100644 index 000000000..d34981471 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/generic_test.go @@ -0,0 +1,39 @@ +package badgerstore + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" +) + +func TestGeneric(t *testing.T) { + const maxObjectSize = 1 << 16 + + helper := func(t *testing.T, dir string) common.Storage { + return New(WithPath(dir)) + } + + newStore := func(t *testing.T) common.Storage { + return helper(t, t.TempDir()) + } + + blobstortest.TestAll(t, newStore, 1024, maxObjectSize) + + t.Run("info", func(t *testing.T) { + dir := t.TempDir() + blobstortest.TestInfo(t, func(t *testing.T) common.Storage { + return helper(t, dir) + }, Type, dir) + }) +} + +func TestControl(t *testing.T) { + const maxObjectSize = 2048 + + newStore := func(t *testing.T) common.Storage { + return New(WithPath(t.TempDir())) + } + + blobstortest.TestControl(t, newStore, 1024, maxObjectSize) +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/get.go b/pkg/local_object_storage/blobstor/badgerstore/get.go new file mode 100644 index 000000000..c56ec86c0 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/get.go @@ -0,0 +1,127 @@ +package badgerstore + +import ( + "context" + "encoding/hex" + "fmt" + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Get implements common.Storage. +func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { + success := false + size := 0 + startedAt := time.Now() + + defer func() { + s.cfg.metrics.Get(time.Since(startedAt), size, success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.Bool("raw", prm.Raw), + )) + defer span.End() + + data, err := s.getObjectData(prm.Address) + if err != nil { + return common.GetRes{}, err + } + + data, err = s.cfg.compression.Decompress(data) + if err != nil { + return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err) + } + + success = true + size = len(data) + return common.GetRes{Object: obj, RawData: data}, nil +} + +// GetRange implements common.Storage. +func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + success := false + size := 0 + startedAt := time.Now() + + defer func() { + s.cfg.metrics.GetRange(time.Since(startedAt), size, success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", hex.EncodeToString(prm.StorageID)), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + + data, err := s.getObjectData(prm.Address) + if err != nil { + return common.GetRangeRes{}, err + } + + data, err = s.cfg.compression.Decompress(data) + if err != nil { + return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err) + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err) + } + + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + payload := obj.Payload() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) + } + + res := common.GetRangeRes{Data: payload[from:to]} + success = true + size = len(res.Data) + return res, nil +} + +func (s *Store) getObjectData(addr oid.Address) ([]byte, error) { + var data []byte + tx := s.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key(addr)) + if err != nil { + if err == badger.ErrKeyNotFound { + return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return nil, err + } + + data, err = item.ValueCopy(nil) + if err != nil { + return nil, err + } + return data, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/iterate.go b/pkg/local_object_storage/blobstor/badgerstore/iterate.go new file mode 100644 index 000000000..be1e7f836 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/iterate.go @@ -0,0 +1,121 @@ +package badgerstore + +import ( + "bytes" + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "github.com/dgraph-io/badger/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Iterate implements common.Storage. +func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + success := false + startedAt := time.Now() + + defer func() { + s.cfg.metrics.Iterate(time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() + + var last []byte + opts := badger.DefaultIteratorOptions + batch := make([]keyValue, 0, opts.PrefetchSize) + opts.PrefetchSize++ // to skip last + for { + select { + case <-ctx.Done(): + return common.IterateRes{}, ctx.Err() + default: + } + + batch = batch[:0] + err := s.db.View(func(tx *badger.Txn) error { + it := tx.NewIterator(opts) + defer it.Close() + + for it.Seek(last); it.Valid(); it.Next() { + if bytes.Equal(last, it.Item().Key()) { + continue + } + + var kv keyValue + var err error + kv.key = it.Item().KeyCopy(nil) + kv.value, err = it.Item().ValueCopy(nil) + if err != nil { + if prm.IgnoreErrors { + continue + } + return err + } + batch = append(batch, kv) + last = kv.key + if len(batch) == opts.PrefetchSize-1 { + break + } + } + return nil + }) + if err != nil { + return common.IterateRes{}, err + } + + select { + case <-ctx.Done(): + return common.IterateRes{}, ctx.Err() + default: + } + + if len(batch) == 0 { + break + } + if err := s.iterateBatch(batch, prm); err != nil { + return common.IterateRes{}, err + } + } + + success = true + return common.IterateRes{}, nil +} + +func (s *Store) iterateBatch(batch []keyValue, prm common.IteratePrm) error { + for _, kv := range batch { + addr, err := address(kv.key) + if err != nil { + if prm.IgnoreErrors { + continue + } + } + data, err := s.cfg.compression.Decompress(kv.value) + if err != nil { + if prm.IgnoreErrors { + continue + } + return fmt.Errorf("could not decompress object data: %w", err) + } + + if err := prm.Handler(common.IterationElement{ + Address: addr, + ObjectData: data, + }); err != nil { + return err + } + } + return nil +} + +type keyValue struct { + key, value []byte +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/keys.go b/pkg/local_object_storage/blobstor/badgerstore/keys.go new file mode 100644 index 000000000..0cbd4acb4 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/keys.go @@ -0,0 +1,28 @@ +package badgerstore + +import ( + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +func key(add oid.Address) []byte { + res := make([]byte, 64) + add.Container().Encode(res) + add.Object().Encode(res[32:]) + return res +} + +func address(k []byte) (oid.Address, error) { + var res oid.Address + var containerID cid.ID + var objectID oid.ID + if err := containerID.Decode(k[:32]); err != nil { + return res, err + } + if err := objectID.Decode(k[32:]); err != nil { + return res, err + } + res.SetContainer(containerID) + res.SetObject(objectID) + return res, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/metrics.go b/pkg/local_object_storage/blobstor/badgerstore/metrics.go new file mode 100644 index 000000000..051b087f0 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/metrics.go @@ -0,0 +1,31 @@ +package badgerstore + +import "time" + +type Metrics interface { + SetParentID(parentID string) + + SetMode(readOnly bool) + Close() + + Delete(d time.Duration, success bool) + Exists(d time.Duration, success bool) + GetRange(d time.Duration, size int, success bool) + Get(d time.Duration, size int, success bool) + Iterate(d time.Duration, success bool) + Put(d time.Duration, size int, success bool) +} + +var _ Metrics = (*noopMetrics)(nil) + +type noopMetrics struct{} + +func (*noopMetrics) Close() {} +func (*noopMetrics) Delete(time.Duration, bool) {} +func (*noopMetrics) Exists(time.Duration, bool) {} +func (*noopMetrics) Get(time.Duration, int, bool) {} +func (*noopMetrics) GetRange(time.Duration, int, bool) {} +func (*noopMetrics) Iterate(time.Duration, bool) {} +func (*noopMetrics) Put(time.Duration, int, bool) {} +func (*noopMetrics) SetMode(bool) {} +func (*noopMetrics) SetParentID(string) {} diff --git a/pkg/local_object_storage/blobstor/badgerstore/put.go b/pkg/local_object_storage/blobstor/badgerstore/put.go new file mode 100644 index 000000000..c05445100 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/put.go @@ -0,0 +1,52 @@ +package badgerstore + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// Put implements common.Storage. +func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { + success := false + size := 0 + startedAt := time.Now() + + defer func() { + s.cfg.metrics.Put(time.Since(startedAt), size, success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put", + trace.WithAttributes( + attribute.String("path", s.cfg.db.Dir), + attribute.String("address", prm.Address.EncodeToString()), + attribute.Bool("dont_compress", prm.DontCompress), + )) + defer span.End() + + if s.readOnly() { + return common.PutRes{}, common.ErrReadOnly + } + + if !prm.DontCompress { + prm.RawData = s.cfg.compression.Compress(prm.RawData) + } + + b := s.db.NewWriteBatch() + defer b.Cancel() + + err := b.Set(key(prm.Address), prm.RawData) + if err != nil { + return common.PutRes{}, err + } + if err = b.Flush(); err != nil { + return common.PutRes{}, err + } + success = true + size = len(prm.RawData) + return common.PutRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/badgerstore/store.go b/pkg/local_object_storage/blobstor/badgerstore/store.go new file mode 100644 index 000000000..450ced888 --- /dev/null +++ b/pkg/local_object_storage/blobstor/badgerstore/store.go @@ -0,0 +1,68 @@ +package badgerstore + +import ( + "context" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "github.com/dgraph-io/badger/v4" +) + +const ( + Type = "badgerstore" +) + +var _ common.Storage = (*Store)(nil) + +type Store struct { + cfg *cfg + db *badger.DB + + modeMtx *sync.Mutex // protects fields in group below + opened bool + gcCancel context.CancelFunc + + wg *sync.WaitGroup +} + +// New returns new Store instance with opts applied. +func New(opts ...Option) *Store { + s := &Store{ + cfg: defaultCfg(), + modeMtx: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + } + for _, opt := range opts { + opt(s.cfg) + } + return s +} + +// Compressor implements common.Storage. +func (s *Store) Compressor() *compression.Config { + return s.cfg.compression +} + +// Path implements common.Storage. +func (s *Store) Path() string { + return s.cfg.db.Dir +} + +// SetCompressor implements common.Storage. +func (s *Store) SetCompressor(cc *compression.Config) { + s.cfg.compression = cc +} + +// SetParentID implements common.Storage. +func (s *Store) SetParentID(parentID string) { + s.cfg.metrics.SetParentID(parentID) +} + +// SetReportErrorFunc implements common.Storage. +func (*Store) SetReportErrorFunc(func(string, error)) {} + +// Type implements common.Storage. +func (*Store) Type() string { + return Type +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index d993767b7..722ec0002 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -50,7 +50,7 @@ func (b *Blobovniczas) Close() error { b.dbCache.Close() // order important b.activeDBManager.Close() b.commondbManager.Close() - + b.metrics.Close() return nil } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/put.go b/pkg/local_object_storage/blobstor/blobovniczatree/put.go index 6f9c8c0de..a02fef13e 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/put.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/put.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" ) -// Put saves object in the maximum weight blobobnicza. +// Put saves object in the maximum weight blobovnicza. // // returns error if could not save object in any blobovnicza. func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { diff --git a/pkg/local_object_storage/blobstor/blobtree/blobtree.go b/pkg/local_object_storage/blobstor/blobtree/blobtree.go new file mode 100644 index 000000000..a50114676 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/blobtree.go @@ -0,0 +1,83 @@ +package blobtree + +import ( + "errors" + "path/filepath" + "strings" + "sync/atomic" + "syscall" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +var _ common.Storage = &BlobTree{} + +type BlobTree struct { + cfg cfg + dirLock *utilSync.KeyLocker[string] + fileLock *utilSync.KeyLocker[string] + compressor *compression.Config + dispatcher *rootDispatcher + suffix atomic.Uint64 +} + +func New(opts ...Option) *BlobTree { + b := &BlobTree{ + cfg: cfg{ + targetFileSizeBytes: 4 * 1024 * 1024, + rootPath: "./", + depth: 3, + permissions: 0700, + initWorkersCount: 1000, + metrics: &noopMetrics{}, + }, + dirLock: utilSync.NewKeyLocker[string](), + fileLock: utilSync.NewKeyLocker[string](), + } + + for _, opt := range opts { + opt(&b.cfg) + } + + b.dispatcher = newRootDispatcher() + + return b +} + +func (b *BlobTree) getDir(addr oid.Address) string { + sAddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString() + var sb strings.Builder + size := int(b.cfg.depth * (directoryLength + 1)) // (character + slash for every level) + sb.Grow(size) + + for i := uint64(0); i < b.cfg.depth; i++ { + if i > 0 { + sb.WriteRune(filepath.Separator) + } + sb.WriteString(sAddr[:directoryLength]) + sAddr = sAddr[directoryLength:] + } + return sb.String() +} + +func (b *BlobTree) createDir(dir string, isSystemPath bool) error { + b.dirLock.Lock(dir) + defer b.dirLock.Unlock(dir) + + if !isSystemPath { + dir = b.getSystemPath(dir) + } + if err := util.MkdirAllX(dir, b.cfg.permissions); err != nil { + if errors.Is(err, syscall.ENOSPC) { + err = common.ErrNoSpace + return err + } + return err + } + + return nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/config.go b/pkg/local_object_storage/blobstor/blobtree/config.go new file mode 100644 index 000000000..daceab7d1 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/config.go @@ -0,0 +1,15 @@ +package blobtree + +import "io/fs" + +var directoryLength uint64 = 1 + +type cfg struct { + rootPath string + depth uint64 + targetFileSizeBytes uint64 + permissions fs.FileMode + readOnly bool + initWorkersCount int + metrics Metrics +} diff --git a/pkg/local_object_storage/blobstor/blobtree/content.go b/pkg/local_object_storage/blobstor/blobtree/content.go new file mode 100644 index 000000000..b3a457e54 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/content.go @@ -0,0 +1,209 @@ +package blobtree + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +const ( + defaultVersion = 0 + + sizeOfVersion = 1 + sizeOfCount = 8 + sizeOfDataLength = 8 + sizeOfContainerID = sha256.Size + sizeOfObjectID = sha256.Size + + dataExtension = ".data" +) + +var ( + errFileToSmall = errors.New("invalid file content: not enough bytes to read count of records") + errInvalidFileContentVersion = errors.New("invalid file content: not enough bytes to read record version") + errInvalidFileContentContainerID = errors.New("invalid file content: not enough bytes to read container ID") + errInvalidFileContentObjectID = errors.New("invalid file content: not enough bytes to read object ID") + errInvalidFileContentLength = errors.New("invalid file content: not enough bytes to read data length") + errInvalidFileContentData = errors.New("invalid file content: not enough bytes to read data") +) + +type objectData struct { + Version byte + Address oid.Address + Data []byte +} + +func (b *BlobTree) readFileContent(path string) ([]objectData, error) { + rawData, err := os.ReadFile(b.getSystemPath(path)) + if err != nil { + if os.IsNotExist(err) { + return []objectData{}, nil + } + return nil, err + } + return b.unmarshalSlice(rawData) +} + +func (b *BlobTree) unmarshalSlice(data []byte) ([]objectData, error) { + if len(data) < sizeOfCount { + return nil, errFileToSmall + } + count := binary.LittleEndian.Uint64(data[:8]) + result := make([]objectData, 0, count) + + data = data[sizeOfCount:] + var idx uint64 + for idx = 0; idx < count; idx++ { + record, read, err := b.unmarshalRecord(data) + if err != nil { + return nil, err + } + result = append(result, record) + data = data[read:] + } + + return result, nil +} + +func (b *BlobTree) unmarshalRecord(data []byte) (objectData, uint64, error) { + if len(data) < sizeOfVersion { + return objectData{}, 0, errInvalidFileContentVersion + } + var result objectData + var read uint64 + result.Version = data[0] + if result.Version != defaultVersion { + return objectData{}, 0, fmt.Errorf("invalid file content: unknown version %d", result.Version) + } + read += sizeOfVersion + + if len(data[read:]) < sizeOfContainerID { + return objectData{}, 0, errInvalidFileContentContainerID + } + var contID cid.ID + if err := contID.Decode(data[read : read+sizeOfContainerID]); err != nil { + return objectData{}, 0, fmt.Errorf("invalid file content: failed to read container ID: %w", err) + } + read += sizeOfContainerID + + if len(data[read:]) < sizeOfObjectID { + return objectData{}, 0, errInvalidFileContentObjectID + } + var objID oid.ID + if err := objID.Decode(data[read : read+sizeOfObjectID]); err != nil { + return objectData{}, 0, fmt.Errorf("invalid file content: failed to read object ID: %w", err) + } + read += sizeOfObjectID + + result.Address.SetContainer(contID) + result.Address.SetObject(objID) + + if len(data[read:]) < sizeOfDataLength { + return objectData{}, 0, errInvalidFileContentLength + } + dataLength := binary.LittleEndian.Uint64(data[read : read+sizeOfDataLength]) + read += sizeOfDataLength + + if uint64(len(data[read:])) < dataLength { + return objectData{}, 0, errInvalidFileContentData + } + result.Data = make([]byte, dataLength) + copy(result.Data, data[read:read+dataLength]) + read += dataLength + + return result, read, nil +} + +func (b *BlobTree) saveContentToFile(records []objectData, path string) (uint64, error) { + data, err := b.marshalSlice(records) + if err != nil { + return 0, err + } + return uint64(len(data)), b.writeFile(path, data) +} + +func (b *BlobTree) writeFile(p string, data []byte) error { + p = b.getSystemPath(p) + f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL|os.O_SYNC, b.cfg.permissions) + if err != nil { + return err + } + _, err = f.Write(data) + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + return err +} + +func (b *BlobTree) getSystemPath(path string) string { + return filepath.Join(b.cfg.rootPath, path) +} + +func (b *BlobTree) marshalSlice(records []objectData) ([]byte, error) { + buf := make([]byte, b.estimateSize(records)) + result := buf + binary.LittleEndian.PutUint64(buf, uint64(len(records))) + buf = buf[sizeOfCount:] + for _, record := range records { + written := b.marshalRecord(record, buf) + buf = buf[written:] + } + return result, nil +} + +func (b *BlobTree) marshalRecord(record objectData, dst []byte) uint64 { + var written uint64 + + dst[0] = record.Version + dst = dst[sizeOfVersion:] + written += sizeOfVersion + + record.Address.Container().Encode(dst) + dst = dst[sizeOfContainerID:] + written += sizeOfContainerID + + record.Address.Object().Encode(dst) + dst = dst[sizeOfObjectID:] + written += sizeOfObjectID + + binary.LittleEndian.PutUint64(dst, uint64(len(record.Data))) + dst = dst[sizeOfDataLength:] + written += sizeOfDataLength + + copy(dst, record.Data) + written += uint64(len(record.Data)) + + return written +} + +func (b *BlobTree) estimateSize(records []objectData) uint64 { + var result uint64 + result += sizeOfCount + for _, record := range records { + result += (sizeOfVersion + sizeOfContainerID + sizeOfObjectID + sizeOfDataLength) + result += uint64(len(record.Data)) + } + return result +} + +func (b *BlobTree) getFilePath(dir string, idx uint64) string { + return filepath.Join(dir, strconv.FormatUint(idx, 16)+dataExtension) +} + +func (b *BlobTree) parsePath(path string) (string, uint64, error) { + dir := filepath.Dir(path) + fileName := strings.TrimSuffix(filepath.Base(path), dataExtension) + idx, err := strconv.ParseUint(fileName, 16, 64) + if err != nil { + return "", 0, fmt.Errorf("failed to parse blobtree path: %w", err) + } + return dir, idx, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/control.go b/pkg/local_object_storage/blobstor/blobtree/control.go new file mode 100644 index 000000000..584421b43 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/control.go @@ -0,0 +1,102 @@ +package blobtree + +import ( + "os" + "path/filepath" + "strconv" + "strings" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" + "golang.org/x/sync/errgroup" +) + +var Type = "blobtree" + +func (b *BlobTree) Open(readOnly bool) error { + b.cfg.readOnly = readOnly + b.cfg.metrics.SetMode(readOnly) + return nil +} + +func (b *BlobTree) Init() error { + if err := b.createDir(b.cfg.rootPath, true); err != nil { + return err + } + + var eg errgroup.Group + eg.SetLimit(b.cfg.initWorkersCount) + eg.Go(func() error { + return b.initDir(&eg, "") + }) + return eg.Wait() +} + +func (b *BlobTree) initDir(eg *errgroup.Group, dir string) error { + entities, err := os.ReadDir(b.getSystemPath(dir)) + if err != nil { + return err + } + for _, entity := range entities { + if entity.IsDir() { + eg.Go(func() error { + return b.initDir(eg, filepath.Join(dir, entity.Name())) + }) + continue + } + + path := filepath.Join(dir, entity.Name()) + + if b.isTempFile(entity.Name()) { + if err = os.Remove(b.getSystemPath(path)); err != nil { + return err + } + continue + } + + idx, err := b.parseIdx(entity.Name()) + if err != nil { + return err + } + b.dispatcher.Init(dir, idx) + b.cfg.metrics.IncFilesCount() + + stat, err := os.Stat(b.getSystemPath(path)) + if err != nil { + return err + } + if stat.Size() < int64(b.cfg.targetFileSizeBytes) { + b.dispatcher.ReturnIdx(dir, idx) + } + } + return nil +} + +func (b *BlobTree) isTempFile(name string) bool { + return strings.Contains(name, tempFileSymbols) +} + +func (b *BlobTree) parseIdx(name string) (uint64, error) { + return strconv.ParseUint(strings.TrimSuffix(name, dataExtension), 16, 64) +} + +func (b *BlobTree) Close() error { + b.cfg.metrics.Close() + return nil +} + +func (b *BlobTree) Type() string { return Type } +func (b *BlobTree) Path() string { return b.cfg.rootPath } + +func (b *BlobTree) SetCompressor(cc *compression.Config) { + b.compressor = cc +} + +func (b *BlobTree) Compressor() *compression.Config { + return b.compressor +} + +func (b *BlobTree) SetReportErrorFunc(_ func(string, error)) {} + +func (b *BlobTree) SetParentID(parentID string) { + b.cfg.metrics.SetParentID(parentID) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/delete.go b/pkg/local_object_storage/blobstor/blobtree/delete.go new file mode 100644 index 000000000..bfc46ef2b --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/delete.go @@ -0,0 +1,151 @@ +package blobtree + +import ( + "context" + "os" + "path/filepath" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (b *BlobTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) { + var ( + success = false + startedAt = time.Now() + ) + defer func() { + b.cfg.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.Delete", + trace.WithAttributes( + attribute.String("path", b.cfg.rootPath), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), + )) + defer span.End() + + if b.cfg.readOnly { + return common.DeleteRes{}, common.ErrReadOnly + } + + var res common.DeleteRes + var err error + if path, ok := getPathFromStorageID(prm.StorageID); ok { + res, err = b.deleteFromPath(prm.Address, path) + } else { + res, err = b.findAndDelete(prm.Address) + } + success = err == nil + return res, err +} + +func (b *BlobTree) deleteFromPath(addr oid.Address, path string) (common.DeleteRes, error) { + b.fileLock.Lock(path) + defer b.fileLock.Unlock(path) + + dir, idx, err := b.parsePath(path) + if err != nil { + return common.DeleteRes{}, err + } + + records, err := b.readFileContent(path) + if err != nil { + return common.DeleteRes{}, err + } + + deleteIdx := -1 + for i := range records { + if records[i].Address.Equals(addr) { + deleteIdx = i + break + } + } + + if deleteIdx == -1 { + return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + + if len(records) == 1 { + err = os.Remove(b.getSystemPath(path)) + if err == nil { + b.dispatcher.ReturnIdx(dir, idx) + b.cfg.metrics.DecFilesCount() + } + return common.DeleteRes{}, err + } + + records = append(records[:deleteIdx], records[deleteIdx+1:]...) + size, err := b.writeToTmpAndRename(records, path) + if err != nil { + return common.DeleteRes{}, err + } + if size < b.cfg.targetFileSizeBytes { + b.dispatcher.ReturnIdx(dir, idx) + } + return common.DeleteRes{}, nil +} + +func (b *BlobTree) findAndDelete(addr oid.Address) (common.DeleteRes, error) { + dir := b.getDir(addr) + idx, err := b.findFileIdx(dir, addr) + if err != nil { + return common.DeleteRes{}, err + } + return b.deleteFromPath(addr, b.getFilePath(dir, idx)) +} + +func (b *BlobTree) findFileIdx(dir string, addr oid.Address) (uint64, error) { + entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir)) + if err != nil { + if os.IsNotExist(err) { + return 0, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return 0, err + } + for _, entity := range entities { + if entity.IsDir() { + continue + } + if b.isTempFile(entity.Name()) { + continue + } + idx, err := b.parseIdx(entity.Name()) + if err != nil { + continue + } + path := b.getFilePath(dir, idx) + contains, err := b.fileContainsObject(path, addr) + if err != nil { + return 0, err + } + if contains { + return idx, nil + } + } + return 0, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) fileContainsObject(path string, addr oid.Address) (bool, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return false, err + } + + for i := range records { + if records[i].Address.Equals(addr) { + return true, nil + } + } + return false, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/dispatcher.go b/pkg/local_object_storage/blobstor/blobtree/dispatcher.go new file mode 100644 index 000000000..1ce5d9da9 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/dispatcher.go @@ -0,0 +1,94 @@ +package blobtree + +import ( + "sync" +) + +type rootDispatcher struct { + dispatchers map[string]*dirDispatcher + guard sync.Mutex +} + +func newRootDispatcher() *rootDispatcher { + return &rootDispatcher{ + dispatchers: make(map[string]*dirDispatcher), + } +} + +func (d *rootDispatcher) GetIdxForWrite(dir string) uint64 { + return d.getDirDispatcher(dir).GetIdxForWrite() +} + +func (d *rootDispatcher) ReturnIdx(dir string, idx uint64) { + d.getDirDispatcher(dir).ReturnIdx(idx) +} + +func (d *rootDispatcher) Init(dir string, idx uint64) { + d.getDirDispatcher(dir).Init(idx) +} + +func (d *rootDispatcher) getDirDispatcher(dir string) *dirDispatcher { + d.guard.Lock() + defer d.guard.Unlock() + + if result, ok := d.dispatchers[dir]; ok { + return result + } + + result := newDirDispatcher(dir) + d.dispatchers[dir] = result + return result +} + +type dirDispatcher struct { + dir string + guard sync.Mutex + indicies map[uint64]struct{} + nextIndex uint64 +} + +func newDirDispatcher(dir string) *dirDispatcher { + return &dirDispatcher{ + dir: dir, + indicies: make(map[uint64]struct{}), + } +} + +func (d *dirDispatcher) GetIdxForWrite() uint64 { + d.guard.Lock() + defer d.guard.Unlock() + + var result uint64 + var found bool + + for idx := range d.indicies { + result = idx + found = true + break + } + + if found { + delete(d.indicies, result) + return result + } + + result = d.nextIndex + d.nextIndex++ + return result +} + +func (d *dirDispatcher) ReturnIdx(idx uint64) { + d.guard.Lock() + defer d.guard.Unlock() + + d.indicies[idx] = struct{}{} +} + +func (d *dirDispatcher) Init(idx uint64) { + d.guard.Lock() + defer d.guard.Unlock() + + if d.nextIndex <= idx { + d.nextIndex = idx + 1 + } +} diff --git a/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go b/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go new file mode 100644 index 000000000..ed72bf948 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/dispatcher_test.go @@ -0,0 +1,29 @@ +package blobtree + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDispatcher(t *testing.T) { + t.Parallel() + + d := newRootDispatcher() + idx := d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(0), idx) + d.ReturnIdx("/dir1", idx) + + idx = d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(0), idx) + + idx = d.GetIdxForWrite("/dir1") + require.Equal(t, uint64(1), idx) + + d.Init("/dir2", 5) + idx = d.GetIdxForWrite("/dir2") + require.Equal(t, uint64(6), idx) + + idx = d.GetIdxForWrite("/dir2") + require.Equal(t, uint64(7), idx) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/exists.go b/pkg/local_object_storage/blobstor/blobtree/exists.go new file mode 100644 index 000000000..559cbf57e --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/exists.go @@ -0,0 +1,76 @@ +package blobtree + +import ( + "context" + "errors" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (b *BlobTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + b.cfg.metrics.Exists(time.Since(startedAt), success, prm.StorageID != nil) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.Exists", + trace.WithAttributes( + attribute.String("path", b.cfg.rootPath), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), + )) + defer span.End() + + var res common.ExistsRes + var err error + if path, ok := getPathFromStorageID(prm.StorageID); ok { + res, err = b.existsFromPath(prm.Address, path) + } else { + res, err = b.findAndCheckExistance(prm.Address) + } + success = err == nil + return res, err +} + +func (b *BlobTree) existsFromPath(addr oid.Address, path string) (common.ExistsRes, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.ExistsRes{}, err + } + + for i := range records { + if records[i].Address.Equals(addr) { + return common.ExistsRes{ + Exists: true, + }, nil + } + } + + return common.ExistsRes{}, nil +} + +func (b *BlobTree) findAndCheckExistance(addr oid.Address) (common.ExistsRes, error) { + dir := b.getDir(addr) + _, err := b.findFileIdx(dir, addr) + if err == nil { + return common.ExistsRes{Exists: true}, nil + } + + var notFound *apistatus.ObjectNotFound + if errors.As(err, ¬Found) { + return common.ExistsRes{}, nil + } + return common.ExistsRes{}, err +} diff --git a/pkg/local_object_storage/blobstor/blobtree/generic_test.go b/pkg/local_object_storage/blobstor/blobtree/generic_test.go new file mode 100644 index 000000000..f0c24aee5 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/generic_test.go @@ -0,0 +1,39 @@ +package blobtree + +import ( + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" +) + +func TestGeneric(t *testing.T) { + newTreeFromPath := func(path string) common.Storage { + return New( + WithPath(path), + WithDepth(2)) + } + + newTree := func(t *testing.T) common.Storage { + return newTreeFromPath(t.TempDir()) + } + + blobstortest.TestAll(t, newTree, 2048, 16*1024) + + t.Run("info", func(t *testing.T) { + path := t.TempDir() + blobstortest.TestInfo(t, func(*testing.T) common.Storage { + return newTreeFromPath(path) + }, Type, path) + }) +} + +func TestControl(t *testing.T) { + newTree := func(t *testing.T) common.Storage { + return New( + WithPath(t.TempDir()), + WithDepth(2)) + } + + blobstortest.TestControl(t, newTree, 2048, 2048) +} diff --git a/pkg/local_object_storage/blobstor/blobtree/get.go b/pkg/local_object_storage/blobstor/blobtree/get.go new file mode 100644 index 000000000..de92b0239 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/get.go @@ -0,0 +1,129 @@ +package blobtree + +import ( + "context" + "os" + "path/filepath" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (b *BlobTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) { + var ( + startedAt = time.Now() + success = false + size = 0 + ) + defer func() { + b.cfg.metrics.Get(time.Since(startedAt), size, success, prm.StorageID != nil) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.Get", + trace.WithAttributes( + attribute.String("path", b.cfg.rootPath), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), + attribute.Bool("raw", prm.Raw), + )) + defer span.End() + + res, err := b.get(prm) + success = err == nil + size = len(res.RawData) + return res, err +} + +func (b *BlobTree) get(prm common.GetPrm) (common.GetRes, error) { + if path, ok := getPathFromStorageID(prm.StorageID); ok { + return b.getFromPath(prm.Address, path) + } + return b.findAndGet(prm.Address) +} + +func (b *BlobTree) getFromPath(addr oid.Address, path string) (common.GetRes, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.GetRes{}, err + } + + for _, record := range records { + if record.Address.Equals(addr) { + return b.unmarshalGetRes(record) + } + } + + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) unmarshalGetRes(record objectData) (common.GetRes, error) { + data, err := b.compressor.Decompress(record.Data) + if err != nil { + return common.GetRes{}, err + } + + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + return common.GetRes{}, err + } + return common.GetRes{Object: obj, RawData: data}, nil +} + +func (b *BlobTree) findAndGet(addr oid.Address) (common.GetRes, error) { + dir := b.getDir(addr) + entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir)) + if err != nil { + if os.IsNotExist(err) { + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) + } + return common.GetRes{}, err + } + for _, entity := range entities { + if entity.IsDir() { + continue + } + if b.isTempFile(entity.Name()) { + continue + } + path := filepath.Join(dir, entity.Name()) + res, err := b.tryReadObject(path, addr) + if err != nil { + return common.GetRes{}, err + } + if res.Object != nil { + return res, nil + } + } + return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound)) +} + +func (b *BlobTree) tryReadObject(path string, addr oid.Address) (common.GetRes, error) { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + return common.GetRes{}, err + } + + for _, record := range records { + if record.Address.Equals(addr) { + res, err := b.unmarshalGetRes(record) + if err != nil { + return common.GetRes{}, err + } + return res, nil + } + } + return common.GetRes{}, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/get_range.go b/pkg/local_object_storage/blobstor/blobtree/get_range.go new file mode 100644 index 000000000..a89693c9f --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/get_range.go @@ -0,0 +1,55 @@ +package blobtree + +import ( + "context" + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) { + var ( + startedAt = time.Now() + success = false + size = 0 + ) + defer func() { + b.cfg.metrics.GetRange(time.Since(startedAt), size, success, prm.StorageID != nil) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.GetRange", + trace.WithAttributes( + attribute.String("path", b.cfg.rootPath), + attribute.String("address", prm.Address.EncodeToString()), + attribute.String("storage_id", string(prm.StorageID)), + attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)), + attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)), + )) + defer span.End() + + gRes, err := b.get(common.GetPrm{Address: prm.Address, StorageID: prm.StorageID}) + if err != nil { + return common.GetRangeRes{}, err + } + + payload := gRes.Object.Payload() + from := prm.Range.GetOffset() + to := from + prm.Range.GetLength() + + if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { + return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange)) + } + + res := common.GetRangeRes{ + Data: payload[from:to], + } + size = len(res.Data) + success = true + return res, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/iterate.go b/pkg/local_object_storage/blobstor/blobtree/iterate.go new file mode 100644 index 000000000..4ea9d7ab8 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/iterate.go @@ -0,0 +1,97 @@ +package blobtree + +import ( + "context" + "os" + "path/filepath" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +func (b *BlobTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + var ( + startedAt = time.Now() + err error + ) + defer func() { + b.cfg.metrics.Iterate(time.Since(startedAt), err == nil) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.Iterate", + trace.WithAttributes( + attribute.String("path", b.cfg.rootPath), + attribute.Bool("ignore_errors", prm.IgnoreErrors), + )) + defer span.End() + + err = b.iterateDir("", prm) + return common.IterateRes{}, err +} + +func (b *BlobTree) iterateDir(dir string, prm common.IteratePrm) error { + entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir)) + if err != nil { + if prm.IgnoreErrors { + return nil + } + return err + } + + for _, entity := range entities { + if entity.IsDir() { + err := b.iterateDir(filepath.Join(dir, entity.Name()), prm) + if err != nil { + return err + } + continue + } + + if b.isTempFile(entity.Name()) { + continue + } + + path := filepath.Join(dir, entity.Name()) + err = b.iterateRecords(path, prm) + if err != nil { + return err + } + } + return nil +} + +func (b *BlobTree) iterateRecords(path string, prm common.IteratePrm) error { + b.fileLock.RLock(path) + defer b.fileLock.RUnlock(path) + + records, err := b.readFileContent(path) + if err != nil { + if prm.IgnoreErrors { + return nil + } + return err + } + + for _, record := range records { + record.Data, err = b.compressor.Decompress(record.Data) + if err != nil { + if prm.IgnoreErrors { + continue + } + return err + } + + err = prm.Handler(common.IterationElement{ + Address: record.Address, + ObjectData: record.Data, + StorageID: pathToStorageID(path), + }) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/metrics.go b/pkg/local_object_storage/blobstor/blobtree/metrics.go new file mode 100644 index 000000000..2008ad605 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/metrics.go @@ -0,0 +1,34 @@ +package blobtree + +import "time" + +type Metrics interface { + SetParentID(parentID string) + + SetMode(readOnly bool) + Close() + + Delete(d time.Duration, success, withStorageID bool) + Exists(d time.Duration, success, withStorageID bool) + GetRange(d time.Duration, size int, success, withStorageID bool) + Get(d time.Duration, size int, success, withStorageID bool) + Iterate(d time.Duration, success bool) + Put(d time.Duration, size int, success bool) + + IncFilesCount() + DecFilesCount() +} + +type noopMetrics struct{} + +func (m *noopMetrics) SetParentID(string) {} +func (m *noopMetrics) SetMode(bool) {} +func (m *noopMetrics) Close() {} +func (m *noopMetrics) Delete(time.Duration, bool, bool) {} +func (m *noopMetrics) Exists(time.Duration, bool, bool) {} +func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {} +func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} +func (m *noopMetrics) Iterate(time.Duration, bool) {} +func (m *noopMetrics) Put(time.Duration, int, bool) {} +func (m *noopMetrics) IncFilesCount() {} +func (m *noopMetrics) DecFilesCount() {} diff --git a/pkg/local_object_storage/blobstor/blobtree/option.go b/pkg/local_object_storage/blobstor/blobtree/option.go new file mode 100644 index 000000000..786fc3f4c --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/option.go @@ -0,0 +1,35 @@ +package blobtree + +import "io/fs" + +type Option func(*cfg) + +func WithPath(path string) Option { + return func(c *cfg) { + c.rootPath = path + } +} + +func WithDepth(depth uint64) Option { + return func(c *cfg) { + c.depth = depth + } +} + +func WithPerm(p fs.FileMode) Option { + return func(c *cfg) { + c.permissions = p + } +} + +func WithTargetSize(size uint64) Option { + return func(c *cfg) { + c.targetFileSizeBytes = size + } +} + +func WithMetrics(m Metrics) Option { + return func(c *cfg) { + c.metrics = m + } +} diff --git a/pkg/local_object_storage/blobstor/blobtree/put.go b/pkg/local_object_storage/blobstor/blobtree/put.go new file mode 100644 index 000000000..6d6653171 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/put.go @@ -0,0 +1,121 @@ +package blobtree + +import ( + "context" + "os" + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +const ( + tempFileSymbols = "###" +) + +func (b *BlobTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) { + var ( + success bool + size int + startedAt = time.Now() + ) + defer func() { + b.cfg.metrics.Put(time.Since(startedAt), size, success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "BlobTree.Put", + trace.WithAttributes( + attribute.String("address", prm.Address.EncodeToString()), + attribute.Bool("dont_compress", prm.DontCompress), + )) + defer span.End() + + if b.cfg.readOnly { + return common.PutRes{}, common.ErrReadOnly + } + + dir := b.getDir(prm.Address) + + if err := b.createDir(dir, false); err != nil { + return common.PutRes{}, err + } + + if !prm.DontCompress { + prm.RawData = b.compressor.Compress(prm.RawData) + } + + path, err := b.saveToLocalDir(prm, dir) + if err != nil { + return common.PutRes{}, err + } + + success = true + size = len(prm.RawData) + + return common.PutRes{StorageID: pathToStorageID(path)}, nil +} + +func (b *BlobTree) saveToLocalDir(prm common.PutPrm, dir string) (string, error) { + returnIdx := true + idx := b.dispatcher.GetIdxForWrite(dir) + path := b.getFilePath(dir, idx) + + b.fileLock.Lock(path) + defer b.fileLock.Unlock(path) + + defer func() { + if returnIdx { + b.dispatcher.ReturnIdx(dir, idx) + } + }() + + currentContent, err := b.readFileContent(path) + if err != nil { + return "", err + } + var newRecord objectData + newRecord.Address = prm.Address + newRecord.Data = prm.RawData + + size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path) + if err != nil { + return "", err + } + returnIdx = size < b.cfg.targetFileSizeBytes + + return path, nil +} + +func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) { + tmpPath := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16) + + size, err := b.saveContentToFile(records, tmpPath) + if err != nil { + _ = os.Remove(b.getSystemPath(tmpPath)) + return 0, err + } + + newFile := false + _, err = os.Stat(b.getSystemPath(path)) + if err != nil { + if os.IsNotExist(err) { + newFile = true + } else { + return 0, err + } + } + + if err := os.Rename(b.getSystemPath(tmpPath), b.getSystemPath(path)); err != nil { + _ = os.Remove(b.getSystemPath(tmpPath)) + return 0, err + } + + if newFile { + b.cfg.metrics.IncFilesCount() + } + + return size, nil +} diff --git a/pkg/local_object_storage/blobstor/blobtree/storage_id.go b/pkg/local_object_storage/blobstor/blobtree/storage_id.go new file mode 100644 index 000000000..64a268eb2 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobtree/storage_id.go @@ -0,0 +1,15 @@ +package blobtree + +import ( + "strings" +) + +const storageIDPrefix = "blobtree:" + +func getPathFromStorageID(storageID []byte) (string, bool) { + return strings.CutPrefix(string(storageID), storageIDPrefix) +} + +func pathToStorageID(path string) []byte { + return []byte(storageIDPrefix + path) +} diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index c773ea0ee..573b87cc2 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -5,7 +5,9 @@ import ( "fmt" "testing" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore" @@ -81,6 +83,23 @@ var storages = []storage{ ) }, }, + { + desc: "blobtree", + create: func(dir string) common.Storage { + return blobtree.New( + blobtree.WithDepth(2), + blobtree.WithPath(dir), + ) + }, + }, + { + desc: "badger", + create: func(dir string) common.Storage { + return badgerstore.New( + badgerstore.WithPath(dir), + ) + }, + }, } func BenchmarkSubstorageReadPerf(b *testing.B) { diff --git a/pkg/local_object_storage/metrics/badgerstore.go b/pkg/local_object_storage/metrics/badgerstore.go new file mode 100644 index 000000000..41a8111ac --- /dev/null +++ b/pkg/local_object_storage/metrics/badgerstore.go @@ -0,0 +1,74 @@ +package metrics + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore" + metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" +) + +func NewBadgerStoreMetrics(path string, m metrics_impl.BadgerStoreMetrics) badgerstore.Metrics { + return &badgerStoreMetrics{ + path: path, + m: m, + } +} + +type badgerStoreMetrics struct { + path, shardID string + m metrics_impl.BadgerStoreMetrics +} + +// Close implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Close() { + m.m.Close(m.shardID, m.path) +} + +// Delete implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Delete(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, m.path, "Delete", d, success) +} + +// Exists implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Exists(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, m.path, "Exists", d, success) +} + +// Get implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Get(d time.Duration, size int, success bool) { + m.m.MethodDuration(m.shardID, m.path, "Get", d, success) + if success { + m.m.AddGet(m.shardID, m.path, size) + } +} + +// GetRange implements badgerstore.Metrics. +func (m *badgerStoreMetrics) GetRange(d time.Duration, size int, success bool) { + m.m.MethodDuration(m.shardID, m.path, "GetRange", d, success) + if success { + m.m.AddGet(m.shardID, m.path, size) + } +} + +// Iterate implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Iterate(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success) +} + +// Put implements badgerstore.Metrics. +func (m *badgerStoreMetrics) Put(d time.Duration, size int, success bool) { + m.m.MethodDuration(m.shardID, m.path, "Put", d, success) + if success { + m.m.AddPut(m.shardID, m.path, size) + } +} + +// SetMode implements badgerstore.Metrics. +func (m *badgerStoreMetrics) SetMode(readOnly bool) { + m.m.SetMode(m.shardID, m.path, readOnly) +} + +// SetParentID implements badgerstore.Metrics. +func (m *badgerStoreMetrics) SetParentID(parentID string) { + m.shardID = parentID +} diff --git a/pkg/local_object_storage/metrics/blobovnicza.go b/pkg/local_object_storage/metrics/blobovnicza.go index 0d0318b3b..16f21cf44 100644 --- a/pkg/local_object_storage/metrics/blobovnicza.go +++ b/pkg/local_object_storage/metrics/blobovnicza.go @@ -8,7 +8,7 @@ import ( metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" ) -func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics) blobovniczatree.Metrics { +func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobovniczaMetrics) blobovniczatree.Metrics { return &blobovniczaTreeMetrics{ path: path, shardID: undefined, @@ -19,7 +19,7 @@ func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics) type blobovniczaTreeMetrics struct { shardID string path string - m metrics_impl.BlobobvnizcaMetrics + m metrics_impl.BlobovniczaMetrics } func (m *blobovniczaTreeMetrics) Blobovnicza() blobovnicza.Metrics { @@ -35,48 +35,48 @@ func (m *blobovniczaTreeMetrics) SetParentID(parentID string) { } func (m *blobovniczaTreeMetrics) SetMode(readOnly bool) { - m.m.SetBlobobvnizcaTreeMode(m.shardID, m.path, readOnly) + m.m.SetBlobovniczaTreeMode(m.shardID, m.path, readOnly) } func (m *blobovniczaTreeMetrics) Close() { - m.m.CloseBlobobvnizcaTree(m.shardID, m.path) + m.m.CloseBlobovniczaTree(m.shardID, m.path) } func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) } func (m *blobovniczaTreeMetrics) Exists(d time.Duration, success, withStorageID bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) } func (m *blobovniczaTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) if success { - m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size) + m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size) } } func (m *blobovniczaTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) if success { - m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size) + m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size) } } func (m *blobovniczaTreeMetrics) Iterate(d time.Duration, success bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{}) } func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) { - m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{}) + m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{}) if success { - m.m.AddBlobobvnizcaTreePut(m.shardID, m.path, size) + m.m.AddBlobovniczaTreePut(m.shardID, m.path, size) } } type blobovniczaMetrics struct { - m metrics_impl.BlobobvnizcaMetrics + m metrics_impl.BlobovniczaMetrics shardID func() string path string } diff --git a/pkg/local_object_storage/metrics/blobtree.go b/pkg/local_object_storage/metrics/blobtree.go new file mode 100644 index 000000000..e78685fae --- /dev/null +++ b/pkg/local_object_storage/metrics/blobtree.go @@ -0,0 +1,74 @@ +package metrics + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree" + metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" +) + +func NewBlobTreeMetrics(path string, m metrics_impl.BlobTreeMetrics) blobtree.Metrics { + return &blobTreeMetrics{ + path: path, + m: m, + } +} + +type blobTreeMetrics struct { + shardID string + path string + m metrics_impl.BlobTreeMetrics +} + +func (m *blobTreeMetrics) SetParentID(parentID string) { + m.shardID = parentID +} + +func (m *blobTreeMetrics) SetMode(readOnly bool) { + m.m.SetBlobTreeMode(m.shardID, m.path, readOnly) +} + +func (m *blobTreeMetrics) Close() { + m.m.CloseBlobTree(m.shardID, m.path) +} + +func (m *blobTreeMetrics) Delete(d time.Duration, success, withStorageID bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) +} + +func (m *blobTreeMetrics) Exists(d time.Duration, success, withStorageID bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) +} + +func (m *blobTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + if success { + m.m.AddBlobTreeGet(m.shardID, m.path, size) + } +} + +func (m *blobTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) + if success { + m.m.AddBlobTreeGet(m.shardID, m.path, size) + } +} + +func (m *blobTreeMetrics) Iterate(d time.Duration, success bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{}) +} + +func (m *blobTreeMetrics) Put(d time.Duration, size int, success bool) { + m.m.BlobTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{}) + if success { + m.m.AddBlobTreePut(m.shardID, m.path, size) + } +} + +func (m *blobTreeMetrics) IncFilesCount() { + m.m.IncBlobTreeFilesCount(m.shardID, m.path) +} + +func (m *blobTreeMetrics) DecFilesCount() { + m.m.DecBlobTreeFilesCount(m.shardID, m.path) +} diff --git a/pkg/metrics/badgerstore.go b/pkg/metrics/badgerstore.go new file mode 100644 index 000000000..4f18f58e9 --- /dev/null +++ b/pkg/metrics/badgerstore.go @@ -0,0 +1,98 @@ +package metrics + +import ( + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type BadgerStoreMetrics interface { + SetMode(shardID, path string, readOnly bool) + Close(shardID, path string) + MethodDuration(shardID, path string, method string, d time.Duration, success bool) + AddPut(shardID, path string, size int) + AddGet(shardID, path string, size int) +} + +var _ BadgerStoreMetrics = (*badgerStoreMetrics)(nil) + +type badgerStoreMetrics struct { + mode *shardIDPathModeValue + reqDuration *prometheus.HistogramVec + put *prometheus.CounterVec + get *prometheus.CounterVec +} + +func newbadgerStoreMetrics() *badgerStoreMetrics { + return &badgerStoreMetrics{ + mode: newShardIDPathMode(badgerStoreSubSystem, "mode", "BadgerStore mode"), + reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: badgerStoreSubSystem, + Name: "request_duration_seconds", + Help: "Accumulated BadgerStore request process duration", + }, []string{shardIDLabel, pathLabel, successLabel, methodLabel}), + put: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: badgerStoreSubSystem, + Name: "put_bytes", + Help: "Accumulated payload size written to BadgerStore", + }, []string{shardIDLabel, pathLabel}), + get: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: badgerStoreSubSystem, + Name: "get_bytes", + Help: "Accumulated payload size read from BadgerStore", + }, []string{shardIDLabel, pathLabel}), + } +} + +// AddGet implements BadgerStoreMetrics. +func (b *badgerStoreMetrics) AddGet(shardID string, path string, size int) { + b.get.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Add(float64(size)) +} + +// AddPut implements BadgerStoreMetrics. +func (b *badgerStoreMetrics) AddPut(shardID string, path string, size int) { + b.put.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Add(float64(size)) +} + +// Close implements BadgerStoreMetrics. +func (b *badgerStoreMetrics) Close(shardID string, path string) { + b.mode.SetMode(shardID, path, closedMode) + b.reqDuration.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.get.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.put.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) +} + +// MethodDuration implements BadgerStoreMetrics. +func (b *badgerStoreMetrics) MethodDuration(shardID string, path string, method string, d time.Duration, success bool) { + b.reqDuration.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + methodLabel: method, + }).Observe(d.Seconds()) +} + +// SetMode implements BadgerStoreMetrics. +func (b *badgerStoreMetrics) SetMode(shardID string, path string, readOnly bool) { + b.mode.SetMode(shardID, path, modeFromBool(readOnly)) +} diff --git a/pkg/metrics/blobovnicza.go b/pkg/metrics/blobovnicza.go index a1ecbc700..7179a1bc4 100644 --- a/pkg/metrics/blobovnicza.go +++ b/pkg/metrics/blobovnicza.go @@ -8,12 +8,12 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -type BlobobvnizcaMetrics interface { - SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool) - CloseBlobobvnizcaTree(shardID, path string) - BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) - AddBlobobvnizcaTreePut(shardID, path string, size int) - AddBlobobvnizcaTreeGet(shardID, path string, size int) +type BlobovniczaMetrics interface { + SetBlobovniczaTreeMode(shardID, path string, readOnly bool) + CloseBlobovniczaTree(shardID, path string) + BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) + AddBlobovniczaTreePut(shardID, path string, size int) + AddBlobovniczaTreeGet(shardID, path string, size int) AddOpenBlobovniczaSize(shardID, path string, size uint64) SubOpenBlobovniczaSize(shardID, path string, size uint64) @@ -78,11 +78,11 @@ func newBlobovnicza() *blobovnicza { } } -func (b *blobovnicza) SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool) { +func (b *blobovnicza) SetBlobovniczaTreeMode(shardID, path string, readOnly bool) { b.treeMode.SetMode(shardID, path, modeFromBool(readOnly)) } -func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) { +func (b *blobovnicza) CloseBlobovniczaTree(shardID, path string) { b.treeMode.SetMode(shardID, path, closedMode) b.treeReqDuration.DeletePartialMatch(prometheus.Labels{ shardIDLabel: shardID, @@ -96,9 +96,21 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) { shardIDLabel: shardID, pathLabel: path, }) + b.treeOpenSize.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.treeOpenItems.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.treeOpenCounter.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) } -func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) { +func (b *blobovnicza) BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) { b.treeReqDuration.With(prometheus.Labels{ shardIDLabel: shardID, pathLabel: path, @@ -108,14 +120,14 @@ func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, metho }).Observe(d.Seconds()) } -func (b *blobovnicza) AddBlobobvnizcaTreePut(shardID, path string, size int) { +func (b *blobovnicza) AddBlobovniczaTreePut(shardID, path string, size int) { b.treePut.With(prometheus.Labels{ shardIDLabel: shardID, pathLabel: path, }).Add(float64(size)) } -func (b *blobovnicza) AddBlobobvnizcaTreeGet(shardID, path string, size int) { +func (b *blobovnicza) AddBlobovniczaTreeGet(shardID, path string, size int) { b.treeGet.With(prometheus.Labels{ shardIDLabel: shardID, pathLabel: path, diff --git a/pkg/metrics/blobtree.go b/pkg/metrics/blobtree.go new file mode 100644 index 000000000..798263a7c --- /dev/null +++ b/pkg/metrics/blobtree.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +type BlobTreeMetrics interface { + SetBlobTreeMode(shardID, path string, readOnly bool) + CloseBlobTree(shardID, path string) + BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) + IncBlobTreeFilesCount(shardID, path string) + DecBlobTreeFilesCount(shardID, path string) + AddBlobTreePut(shardID, path string, size int) + AddBlobTreeGet(shardID, path string, size int) +} + +type blobTreeMetrics struct { + mode *shardIDPathModeValue + reqDuration *prometheus.HistogramVec + put *prometheus.CounterVec + get *prometheus.CounterVec + filesCount *prometheus.GaugeVec +} + +func newBlobTreeMetrics() *blobTreeMetrics { + return &blobTreeMetrics{ + mode: newShardIDPathMode(blobTreeSubSystem, "mode", "Blob tree mode"), + + reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: blobTreeSubSystem, + Name: "request_duration_seconds", + Help: "Accumulated Blob tree request process duration", + }, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}), + put: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: blobTreeSubSystem, + Name: "put_bytes", + Help: "Accumulated payload size written to Blob tree", + }, []string{shardIDLabel, pathLabel}), + get: metrics.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: blobTreeSubSystem, + Name: "get_bytes", + Help: "Accumulated payload size read from Blob tree", + }, []string{shardIDLabel, pathLabel}), + filesCount: metrics.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: blobTreeSubSystem, + Name: "files_count", + Help: "Count of data files in Blob tree", + }, []string{shardIDLabel, pathLabel}), + } +} + +func (b *blobTreeMetrics) SetBlobTreeMode(shardID, path string, readOnly bool) { + b.mode.SetMode(shardID, path, modeFromBool(readOnly)) +} + +func (b *blobTreeMetrics) CloseBlobTree(shardID, path string) { + b.mode.SetMode(shardID, path, closedMode) + b.reqDuration.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.get.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.put.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) + b.filesCount.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }) +} + +func (b *blobTreeMetrics) BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) { + b.reqDuration.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + methodLabel: method, + withStorageIDLabel: withStorageID.String(), + }).Observe(d.Seconds()) +} + +func (b *blobTreeMetrics) IncBlobTreeFilesCount(shardID, path string) { + b.filesCount.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Inc() +} + +func (b *blobTreeMetrics) DecBlobTreeFilesCount(shardID, path string) { + b.filesCount.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Dec() +} + +func (b *blobTreeMetrics) AddBlobTreePut(shardID, path string, size int) { + b.put.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Add(float64(size)) +} + +func (b *blobTreeMetrics) AddBlobTreeGet(shardID, path string, size int) { + b.get.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Add(float64(size)) +} diff --git a/pkg/metrics/consts.go b/pkg/metrics/consts.go index f7a8fd771..9a9598e2d 100644 --- a/pkg/metrics/consts.go +++ b/pkg/metrics/consts.go @@ -7,6 +7,7 @@ const ( fstreeSubSystem = "fstree" blobstoreSubSystem = "blobstore" blobovniczaTreeSubSystem = "blobovnicza_tree" + blobTreeSubSystem = "blobtree" metabaseSubSystem = "metabase" piloramaSubSystem = "pilorama" engineSubsystem = "engine" @@ -21,6 +22,7 @@ const ( writeCacheSubsystem = "writecache" grpcServerSubsystem = "grpc_server" policerSubsystem = "policer" + badgerStoreSubSystem = "badgerstore" successLabel = "success" shardIDLabel = "shard_id" diff --git a/pkg/metrics/node.go b/pkg/metrics/node.go index 0dd86d90e..90ab444d6 100644 --- a/pkg/metrics/node.go +++ b/pkg/metrics/node.go @@ -16,10 +16,12 @@ type NodeMetrics struct { epoch prometheus.Gauge fstree *fstreeMetrics blobstore *blobstoreMetrics - blobobvnizca *blobovnicza + blobovnicza *blobovnicza metabase *metabaseMetrics pilorama *piloramaMetrics grpc *grpcServerMetrics + blobTree *blobTreeMetrics + badgerStore *badgerStoreMetrics policer *policerMetrics morphClient *morphClientMetrics morphCache *morphCacheMetrics @@ -39,16 +41,18 @@ func NewNodeMetrics() *NodeMetrics { Name: "epoch", Help: "Current epoch as seen by inner-ring node.", }), - fstree: newFSTreeMetrics(), - blobstore: newBlobstoreMetrics(), - blobobvnizca: newBlobovnicza(), - metabase: newMetabaseMetrics(), - pilorama: newPiloramaMetrics(), - grpc: newGrpcServerMetrics(), - policer: newPolicerMetrics(), - morphClient: newMorphClientMetrics(), - morphCache: newMorphCacheMetrics(namespace), - log: logger.NewLogMetrics(namespace), + fstree: newFSTreeMetrics(), + blobstore: newBlobstoreMetrics(), + blobovnicza: newBlobovnicza(), + metabase: newMetabaseMetrics(), + pilorama: newPiloramaMetrics(), + grpc: newGrpcServerMetrics(), + policer: newPolicerMetrics(), + morphClient: newMorphClientMetrics(), + morphCache: newMorphCacheMetrics(namespace), + log: logger.NewLogMetrics(namespace), + blobTree: newBlobTreeMetrics(), + badgerStore: newbadgerStoreMetrics(), } } @@ -85,8 +89,8 @@ func (m *NodeMetrics) Blobstore() BlobstoreMetrics { return m.blobstore } -func (m *NodeMetrics) BlobobvnizcaTreeMetrics() BlobobvnizcaMetrics { - return m.blobobvnizca +func (m *NodeMetrics) BlobovniczaTreeMetrics() BlobovniczaMetrics { + return m.blobovnicza } func (m *NodeMetrics) MetabaseMetrics() MetabaseMetrics { @@ -116,3 +120,11 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics { func (m *NodeMetrics) LogMetrics() logger.LogMetrics { return m.log } + +func (m *NodeMetrics) BlobTreeMetrics() BlobTreeMetrics { + return m.blobTree +} + +func (m *NodeMetrics) BadgerStoreMetrics() BadgerStoreMetrics { + return m.badgerStore +} diff --git a/pkg/util/sync/key_locker.go b/pkg/util/sync/key_locker.go index 97de0386d..2a5545569 100644 --- a/pkg/util/sync/key_locker.go +++ b/pkg/util/sync/key_locker.go @@ -3,8 +3,8 @@ package sync import "sync" type locker struct { - mtx sync.Mutex - waiters int // not protected by mtx, must used outer mutex to update concurrently + mtx sync.RWMutex + userCount int // not protected by mtx, must used outer mutex to update concurrently } type KeyLocker[K comparable] struct { @@ -19,26 +19,50 @@ func NewKeyLocker[K comparable]() *KeyLocker[K] { } func (l *KeyLocker[K]) Lock(key K) { + l.lock(key, false) +} + +func (l *KeyLocker[K]) RLock(key K) { + l.lock(key, true) +} + +func (l *KeyLocker[K]) lock(key K, read bool) { l.lockersMtx.Lock() if locker, found := l.lockers[key]; found { - locker.waiters++ + locker.userCount++ l.lockersMtx.Unlock() - locker.mtx.Lock() + if read { + locker.mtx.RLock() + } else { + locker.mtx.Lock() + } return } locker := &locker{ - waiters: 1, + userCount: 1, + } + if read { + locker.mtx.RLock() + } else { + locker.mtx.Lock() } - locker.mtx.Lock() l.lockers[key] = locker l.lockersMtx.Unlock() } func (l *KeyLocker[K]) Unlock(key K) { + l.unlock(key, false) +} + +func (l *KeyLocker[K]) RUnlock(key K) { + l.unlock(key, true) +} + +func (l *KeyLocker[K]) unlock(key K, read bool) { l.lockersMtx.Lock() defer l.lockersMtx.Unlock() @@ -47,10 +71,14 @@ func (l *KeyLocker[K]) Unlock(key K) { return } - if locker.waiters == 1 { + if locker.userCount == 1 { delete(l.lockers, key) } - locker.waiters-- + locker.userCount-- - locker.mtx.Unlock() + if read { + locker.mtx.RUnlock() + } else { + locker.mtx.Unlock() + } } diff --git a/pkg/util/sync/key_locker_test.go b/pkg/util/sync/key_locker_test.go index 3b3e6a694..f4ba3e19d 100644 --- a/pkg/util/sync/key_locker_test.go +++ b/pkg/util/sync/key_locker_test.go @@ -9,7 +9,7 @@ import ( "golang.org/x/sync/errgroup" ) -func TestKeyLocker(t *testing.T) { +func TestKeyLockerWrite(t *testing.T) { taken := false eg, _ := errgroup.WithContext(context.Background()) keyLocker := NewKeyLocker[int]() @@ -30,3 +30,17 @@ func TestKeyLocker(t *testing.T) { } require.NoError(t, eg.Wait()) } + +func TestKeyLockerRead(t *testing.T) { + eg, _ := errgroup.WithContext(context.Background()) + keyLocker := NewKeyLocker[int]() + for i := 0; i < 100; i++ { + eg.Go(func() error { + keyLocker.RLock(0) + defer keyLocker.RUnlock(0) + + return nil + }) + } + require.NoError(t, eg.Wait()) +}