WIP: Blobtree substorage #645
46 changed files with 2856 additions and 86 deletions
|
@ -21,7 +21,9 @@ import (
|
|||
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
|
||||
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
badgerstoreconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/badgerstore"
|
||||
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
||||
blobtreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobtree"
|
||||
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
||||
|
@ -32,7 +34,9 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
|
@ -181,6 +185,14 @@ type subStorageCfg struct {
|
|||
width uint64
|
||||
leafWidth uint64
|
||||
openedCacheSize int
|
||||
|
||||
// badgerstore-specific
|
||||
indexCacheSize int64
|
||||
memTablesCount int
|
||||
compactorsCount int
|
||||
gcInterval time.Duration
|
||||
gcDiscardRatio float64
|
||||
valueLogFileSize int64
|
||||
}
|
||||
|
||||
// readConfig fills applicationConfiguration with raw configuration values
|
||||
|
@ -302,6 +314,18 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
|
|||
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.depth = sub.Depth()
|
||||
sCfg.noSync = sub.NoSync()
|
||||
case blobtree.Type:
|
||||
sub := blobtreeconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.depth = sub.Depth()
|
||||
sCfg.size = sub.Size()
|
||||
case badgerstore.Type:
|
||||
sub := badgerstoreconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.indexCacheSize = sub.IndexCacheSize()
|
||||
sCfg.memTablesCount = sub.MemTablesCount()
|
||||
sCfg.compactorsCount = sub.CompactorsCount()
|
||||
sCfg.gcInterval = sub.GCInterval()
|
||||
sCfg.gcDiscardRatio = sub.GCDiscardRatio()
|
||||
sCfg.valueLogFileSize = sub.ValueLogFileSize()
|
||||
default:
|
||||
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
|
||||
}
|
||||
|
@ -788,52 +812,37 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
for _, sRead := range shCfg.subStorages {
|
||||
switch sRead.typ {
|
||||
case blobovniczatree.Type:
|
||||
blobTreeOpts := []blobovniczatree.Option{
|
||||
blobovniczatree.WithRootPath(sRead.path),
|
||||
blobovniczatree.WithPermissions(sRead.perm),
|
||||
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
blobovniczatree.WithLogger(c.log),
|
||||
}
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
blobTreeOpts = append(blobTreeOpts,
|
||||
blobovniczatree.WithMetrics(
|
||||
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
|
||||
),
|
||||
)
|
||||
}
|
||||
blobovniczaTreeOpts := c.getBlobovniczaTreeOpts(sRead)
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...),
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(blobovniczaTreeOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
case fstree.Type:
|
||||
fstreeOpts := []fstree.Option{
|
||||
fstree.WithPath(sRead.path),
|
||||
fstree.WithPerm(sRead.perm),
|
||||
fstree.WithDepth(sRead.depth),
|
||||
fstree.WithNoSync(sRead.noSync),
|
||||
fstree.WithLogger(c.log),
|
||||
}
|
||||
if c.metricsCollector != nil {
|
||||
fstreeOpts = append(fstreeOpts,
|
||||
fstree.WithMetrics(
|
||||
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fstreeOpts := c.getFSTreeOpts(sRead)
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: fstree.New(fstreeOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return true
|
||||
},
|
||||
})
|
||||
case blobtree.Type:
|
||||
blobTreeOpts := c.getBlobTreeOpts(sRead)
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: blobtree.New(blobTreeOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
case badgerstore.Type:
|
||||
badgerStoreOpts := c.getBadgerStoreOpts(sRead)
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: badgerstore.New(badgerStoreOpts...),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
default:
|
||||
// should never happen, that has already
|
||||
// been handled: when the config was read
|
||||
|
@ -842,6 +851,82 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
return ss
|
||||
}
|
||||
|
||||
func (c *cfg) getBadgerStoreOpts(sRead subStorageCfg) []badgerstore.Option {
|
||||
badgerStoreOpts := []badgerstore.Option{
|
||||
badgerstore.WithPath(sRead.path),
|
||||
badgerstore.WithPermissions(sRead.perm),
|
||||
badgerstore.WithCompactorsCount(sRead.compactorsCount),
|
||||
badgerstore.WithGCDiscardRatio(sRead.gcDiscardRatio),
|
||||
badgerstore.WithGCInterval(sRead.gcInterval),
|
||||
badgerstore.WithIndexCacheSize(sRead.indexCacheSize),
|
||||
badgerstore.WithMemTablesCount(sRead.memTablesCount),
|
||||
badgerstore.WithValueLogSize(sRead.valueLogFileSize),
|
||||
}
|
||||
if c.metricsCollector != nil {
|
||||
badgerStoreOpts = append(badgerStoreOpts,
|
||||
badgerstore.WithMetrics(
|
||||
lsmetrics.NewBadgerStoreMetrics(sRead.path, c.metricsCollector.BadgerStoreMetrics())))
|
||||
}
|
||||
return badgerStoreOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getBlobTreeOpts(sRead subStorageCfg) []blobtree.Option {
|
||||
blobTreeOpts := []blobtree.Option{
|
||||
blobtree.WithPath(sRead.path),
|
||||
blobtree.WithPerm(sRead.perm),
|
||||
blobtree.WithDepth(sRead.depth),
|
||||
blobtree.WithTargetSize(sRead.size),
|
||||
}
|
||||
if c.metricsCollector != nil {
|
||||
blobTreeOpts = append(blobTreeOpts,
|
||||
blobtree.WithMetrics(
|
||||
lsmetrics.NewBlobTreeMetrics(sRead.path, c.metricsCollector.BlobTreeMetrics()),
|
||||
),
|
||||
)
|
||||
}
|
||||
return blobTreeOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getFSTreeOpts(sRead subStorageCfg) []fstree.Option {
|
||||
fstreeOpts := []fstree.Option{
|
||||
fstree.WithPath(sRead.path),
|
||||
fstree.WithPerm(sRead.perm),
|
||||
fstree.WithDepth(sRead.depth),
|
||||
fstree.WithNoSync(sRead.noSync),
|
||||
fstree.WithLogger(c.log),
|
||||
}
|
||||
if c.metricsCollector != nil {
|
||||
fstreeOpts = append(fstreeOpts,
|
||||
fstree.WithMetrics(
|
||||
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
|
||||
),
|
||||
)
|
||||
}
|
||||
return fstreeOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getBlobovniczaTreeOpts(sRead subStorageCfg) []blobovniczatree.Option {
|
||||
blobTreeOpts := []blobovniczatree.Option{
|
||||
blobovniczatree.WithRootPath(sRead.path),
|
||||
blobovniczatree.WithPermissions(sRead.perm),
|
||||
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
blobovniczatree.WithLogger(c.log),
|
||||
}
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
blobTreeOpts = append(blobTreeOpts,
|
||||
blobovniczatree.WithMetrics(
|
||||
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobovniczaTreeMetrics()),
|
||||
),
|
||||
)
|
||||
}
|
||||
return blobTreeOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||
writeCacheOpts := c.getWriteCacheOpts(shCfg)
|
||||
piloramaOpts := c.getPiloramaOpts(shCfg)
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
)
|
||||
|
||||
type Config config.Config
|
||||
|
||||
const (
|
||||
IndexCacheSizeDefault = 256 << 20 //256MB
|
||||
MemTablesCountDefault = 32
|
||||
CompactorsCountDefault = 64
|
||||
GCIntervalDefault = 10 * time.Minute
|
||||
GCDiscardRatioDefault = 0.2
|
||||
ValueLogSizeDefault = 1 << 30 //1GB
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
func From(c *config.Config) *Config {
|
||||
return (*Config)(c)
|
||||
}
|
||||
|
||||
// Type returns the storage type.
|
||||
func (x *Config) Type() string {
|
||||
return badgerstore.Type
|
||||
}
|
||||
|
||||
// IndexCacheSize returns `index_cache_size` value or IndexCacheSizeDefault.
|
||||
func (x *Config) IndexCacheSize() int64 {
|
||||
s := config.SizeInBytesSafe((*config.Config)(x), "index_cache_size")
|
||||
if s > 0 {
|
||||
return int64(s)
|
||||
}
|
||||
|
||||
return IndexCacheSizeDefault
|
||||
}
|
||||
|
||||
// MemTablesCount returns `mem_tables_count` value or MemTablesCountDefault.
|
||||
func (x *Config) MemTablesCount() int {
|
||||
v := config.IntSafe((*config.Config)(x), "mem_tables_count")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return MemTablesCountDefault
|
||||
}
|
||||
|
||||
// CompactorsCount returns `mem_tables_count` value or CompactorsCountDefault.
|
||||
func (x *Config) CompactorsCount() int {
|
||||
v := config.IntSafe((*config.Config)(x), "compactors_count")
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return CompactorsCountDefault
|
||||
}
|
||||
|
||||
// GCInterval returns `gc_interval` value or GCIntervalDefault.
|
||||
func (x *Config) GCInterval() time.Duration {
|
||||
v := config.DurationSafe((*config.Config)(x), "gc_interval")
|
||||
if v > 0 {
|
||||
return v
|
||||
}
|
||||
return GCIntervalDefault
|
||||
}
|
||||
|
||||
// GCDiscardRatio returns `gc_discard_percent` value as ratio value (in range (0.0; 1.0)) or GCDiscardRatioDefault.
|
||||
func (x *Config) GCDiscardRatio() float64 {
|
||||
v := config.Uint32Safe((*config.Config)(x), "gc_discard_percent")
|
||||
if v > 0 && v < 100 {
|
||||
return float64(v) / (float64(100))
|
||||
}
|
||||
return GCDiscardRatioDefault
|
||||
}
|
||||
|
||||
// ValueLogFileSize returns `value_log_file_size` value or ValueLogSizeDefault.
|
||||
func (x *Config) ValueLogFileSize() int64 {
|
||||
s := config.SizeInBytesSafe((*config.Config)(x), "value_log_file_size")
|
||||
if s > 0 {
|
||||
return int64(s)
|
||||
}
|
||||
|
||||
return ValueLogSizeDefault
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
package blobtreeconfig
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
)
|
||||
|
||||
// Config is a wrapper over the config section
|
||||
// which provides access to Blobtree configurations.
|
||||
type Config config.Config
|
||||
|
||||
const (
|
||||
// SizeDefault is a default limit of estimates of single Blobtree file size.
|
||||
SizeDefault = 4 * 1024 * 1024
|
||||
|
||||
// DepthDefault is a default shallow dir depth.
|
||||
DepthDefault = 8
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
func From(c *config.Config) *Config {
|
||||
return (*Config)(c)
|
||||
}
|
||||
|
||||
// Type returns the storage type.
|
||||
func (x *Config) Type() string {
|
||||
return blobtree.Type
|
||||
}
|
||||
|
||||
// Size returns the value of "size" config parameter.
|
||||
//
|
||||
// Returns SizeDefault if the value is not a positive number.
|
||||
func (x *Config) Size() uint64 {
|
||||
s := config.SizeInBytesSafe(
|
||||
(*config.Config)(x),
|
||||
"size",
|
||||
)
|
||||
|
||||
if s > 0 {
|
||||
return s
|
||||
}
|
||||
|
||||
return SizeDefault
|
||||
}
|
||||
|
||||
// ShallowDepth returns the value of "depth" config parameter.
|
||||
//
|
||||
// Returns ShallowDepthDefault if the value is not a positive number.
|
||||
func (x *Config) Depth() uint64 {
|
||||
d := config.UintSafe(
|
||||
(*config.Config)(x),
|
||||
"depth",
|
||||
)
|
||||
|
||||
if d > 0 {
|
||||
return d
|
||||
}
|
||||
|
||||
return DepthDefault
|
||||
}
|
|
@ -9,7 +9,9 @@ import (
|
|||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
||||
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
)
|
||||
|
@ -55,7 +57,7 @@ func validateConfig(c *config.Config) error {
|
|||
}
|
||||
for i := range blobstor {
|
||||
switch blobstor[i].Type() {
|
||||
case fstree.Type, blobovniczatree.Type:
|
||||
case fstree.Type, blobovniczatree.Type, blobtree.Type, badgerstore.Type:
|
||||
default:
|
||||
return fmt.Errorf("unexpected storage type: %s (shard %d)", blobstor[i].Type(), shardNum)
|
||||
}
|
||||
|
|
|
@ -515,5 +515,6 @@ const (
|
|||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||
BadgerStoreGCFailed = "failed to run GC on badgerstore"
|
||||
FailedToGetContainerCounters = "failed to get container counters values"
|
||||
)
|
||||
|
|
|
@ -97,7 +97,7 @@ func TestBlobovnicza(t *testing.T) {
|
|||
testPutGet(t, blz, oidtest.Address(), objSizeLim, nil, nil)
|
||||
}
|
||||
|
||||
// blobovnizca accepts object event if full
|
||||
// blobovnicza accepts object event if full
|
||||
testPutGet(t, blz, oidtest.Address(), 1024, func(err error) bool {
|
||||
return err == nil
|
||||
}, nil)
|
||||
|
|
136
pkg/local_object_storage/blobstor/badgerstore/config.go
Normal file
136
pkg/local_object_storage/blobstor/badgerstore/config.go
Normal file
|
@ -0,0 +1,136 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"github.com/dgraph-io/badger/v4/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type cfg struct {
|
||||
permissions fs.FileMode
|
||||
compression *compression.Config
|
||||
db badger.Options
|
||||
gcTimeout time.Duration
|
||||
gcDiscardRatio float64
|
||||
metrics Metrics
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
// defaultCfg creates default options to create Store.
|
||||
// Default Badger options:
|
||||
// BaseTableSize: 2MB
|
||||
// BaseLevelSize: 10MB
|
||||
// TableSizeMultiplier: 2
|
||||
// LevelSizeMultiplier: 10
|
||||
// MaxLevels: 7
|
||||
// NumLevelZeroTables: 5
|
||||
// ValueLogFileSize: 1GB
|
||||
//
|
||||
// Badger flushes MemTable directly to Level0.
|
||||
// So for Level0 MemTableSize is used as TableSize https://github.com/dgraph-io/badger/blob/v4.1.0/levels.go#L403.
|
||||
// There is no total size limit for Level0, only NumLevelZeroTables
|
||||
//
|
||||
// Badger uses Dynamic Level Sizes like RocksDB.
|
||||
// See https://github.com/facebook/rocksdb/blob/v3.11/include/rocksdb/options.h#L366 for explanation.
|
||||
func defaultCfg() *cfg {
|
||||
opts := badger.DefaultOptions("/")
|
||||
opts.BlockCacheSize = 0 // compression and encryption are disabled, so block cache should be disabled
|
||||
opts.IndexCacheSize = 256 << 20 // 256MB, to not to keep all indicies in memory
|
||||
opts.Compression = options.None // performed by cfg.compressor
|
||||
opts.Logger = nil
|
||||
opts.MetricsEnabled = false
|
||||
opts.NumLevelZeroTablesStall = math.MaxInt // to not to stall because of Level0 slow compaction
|
||||
opts.NumMemtables = 32 // default memtable size is 64MB, so max memory consumption will be 2GB before stall
|
||||
opts.NumCompactors = 64
|
||||
opts.SyncWrites = true
|
||||
opts.ValueLogMaxEntries = math.MaxUint32 // default vLog file size is 1GB, so size is more clear than entries count
|
||||
opts.ValueThreshold = 0 // to store all values in vLog
|
||||
|
||||
return &cfg{
|
||||
permissions: 0o700,
|
||||
db: opts,
|
||||
gcTimeout: 10 * time.Minute,
|
||||
gcDiscardRatio: 0.2, // for 1GB vLog file GC will perform only if around 200MB could be free
|
||||
metrics: &noopMetrics{},
|
||||
logger: &logger.Logger{Logger: zap.L()},
|
||||
}
|
||||
}
|
||||
|
||||
// WithPath sets BadgerStore directory.
|
||||
func WithPath(dir string) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.Dir = dir
|
||||
c.db.ValueDir = dir
|
||||
}
|
||||
}
|
||||
|
||||
// WithPermissions sets persmission flags.
|
||||
func WithPermissions(p fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.permissions = p
|
||||
}
|
||||
}
|
||||
|
||||
// WithIndexCacheSize sets BadgerStore index cache size.
|
||||
func WithIndexCacheSize(sz int64) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.IndexCacheSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithMemTablesCount sets maximum count of memtables.
|
||||
func WithMemTablesCount(count int) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.NumMemtables = count
|
||||
}
|
||||
}
|
||||
|
||||
// WithCompactorsCount sets count of concurrent compactors.
|
||||
func WithCompactorsCount(count int) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.NumCompactors = count
|
||||
}
|
||||
}
|
||||
|
||||
// WithGCInterval sets GC interval value.
|
||||
func WithGCInterval(d time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.gcTimeout = d
|
||||
}
|
||||
}
|
||||
|
||||
// WithGCDiscardRatio sets GC discard ratio.
|
||||
func WithGCDiscardRatio(r float64) Option {
|
||||
return func(c *cfg) {
|
||||
c.gcDiscardRatio = r
|
||||
}
|
||||
}
|
||||
|
||||
// WithValueLogSize sets max value log size.
|
||||
func WithValueLogSize(sz int64) Option {
|
||||
return func(c *cfg) {
|
||||
c.db.ValueLogFileSize = sz
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics sets metrics.
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
||||
// WithLogger sets logger.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.logger = l
|
||||
}
|
||||
}
|
97
pkg/local_object_storage/blobstor/badgerstore/control.go
Normal file
97
pkg/local_object_storage/blobstor/badgerstore/control.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Close implements common.Storage.
|
||||
func (s *Store) Close() error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if !s.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.gcCancel != nil {
|
||||
s.gcCancel()
|
||||
}
|
||||
s.wg.Wait()
|
||||
|
||||
if err := s.db.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.opened = false
|
||||
s.cfg.metrics.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init implements common.Storage.
|
||||
func (s *Store) Init() error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if !s.opened {
|
||||
return fmt.Errorf("store must be opened before initialization")
|
||||
}
|
||||
|
||||
s.startGC()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) startGC() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.gcCancel = cancel
|
||||
|
||||
t := time.NewTicker(s.cfg.gcTimeout)
|
||||
s.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if err := s.db.RunValueLogGC(s.cfg.gcDiscardRatio); err == nil {
|
||||
_ = s.db.RunValueLogGC(s.cfg.gcDiscardRatio) // see https://dgraph.io/docs/badger/get-started/#garbage-collection
|
||||
} else {
|
||||
s.cfg.logger.Error(logs.BadgerStoreGCFailed, zap.Error(err), zap.String("path", s.cfg.db.Dir))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Open implements common.Storage.
|
||||
func (s *Store) Open(readOnly bool) error {
|
||||
s.modeMtx.Lock()
|
||||
defer s.modeMtx.Unlock()
|
||||
|
||||
if s.opened {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := util.MkdirAllX(s.cfg.db.Dir, s.cfg.permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.cfg.db.ReadOnly = readOnly
|
||||
if s.db, err = badger.Open(s.cfg.db); err != nil {
|
||||
return err
|
||||
}
|
||||
s.opened = true
|
||||
s.cfg.metrics.SetMode(readOnly)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) readOnly() bool {
|
||||
return s.cfg.db.ReadOnly
|
||||
}
|
55
pkg/local_object_storage/blobstor/badgerstore/delete.go
Normal file
55
pkg/local_object_storage/blobstor/badgerstore/delete.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Delete implements common.Storage.
|
||||
func (s *Store) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Delete(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.readOnly() {
|
||||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
tx := s.db.NewTransaction(true)
|
||||
defer tx.Discard()
|
||||
|
||||
_, err := tx.Get(key(prm.Address))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
if err = tx.Delete(key(prm.Address)); err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
if err = tx.Commit(); err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
success = true
|
||||
return common.DeleteRes{}, nil
|
||||
}
|
46
pkg/local_object_storage/blobstor/badgerstore/exists.go
Normal file
46
pkg/local_object_storage/blobstor/badgerstore/exists.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Exists implements common.Storage.
|
||||
func (s *Store) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Exists(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
tx := s.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
_, err := tx.Get(key(prm.Address))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
success = true
|
||||
return common.ExistsRes{Exists: false}, nil
|
||||
}
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
success = true
|
||||
return common.ExistsRes{Exists: true}, nil
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
const maxObjectSize = 1 << 16
|
||||
|
||||
helper := func(t *testing.T, dir string) common.Storage {
|
||||
return New(WithPath(dir))
|
||||
}
|
||||
|
||||
newStore := func(t *testing.T) common.Storage {
|
||||
return helper(t, t.TempDir())
|
||||
}
|
||||
|
||||
blobstortest.TestAll(t, newStore, 1024, maxObjectSize)
|
||||
|
||||
t.Run("info", func(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
blobstortest.TestInfo(t, func(t *testing.T) common.Storage {
|
||||
return helper(t, dir)
|
||||
}, Type, dir)
|
||||
})
|
||||
}
|
||||
|
||||
func TestControl(t *testing.T) {
|
||||
const maxObjectSize = 2048
|
||||
|
||||
newStore := func(t *testing.T) common.Storage {
|
||||
return New(WithPath(t.TempDir()))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newStore, 1024, maxObjectSize)
|
||||
}
|
127
pkg/local_object_storage/blobstor/badgerstore/get.go
Normal file
127
pkg/local_object_storage/blobstor/badgerstore/get.go
Normal file
|
@ -0,0 +1,127 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Get implements common.Storage.
|
||||
func (s *Store) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Get(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.Bool("raw", prm.Raw),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
data, err := s.getObjectData(prm.Address)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
data, err = s.cfg.compression.Decompress(data)
|
||||
if err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
size = len(data)
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
}
|
||||
|
||||
// GetRange implements common.Storage.
|
||||
func (s *Store) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.GetRange(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", hex.EncodeToString(prm.StorageID)),
|
||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
data, err := s.getObjectData(prm.Address)
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
|
||||
data, err = s.cfg.compression.Decompress(data)
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRangeRes{}, fmt.Errorf("could not unmarshal the object: %w", err)
|
||||
}
|
||||
|
||||
from := prm.Range.GetOffset()
|
||||
to := from + prm.Range.GetLength()
|
||||
payload := obj.Payload()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
res := common.GetRangeRes{Data: payload[from:to]}
|
||||
success = true
|
||||
size = len(res.Data)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (s *Store) getObjectData(addr oid.Address) ([]byte, error) {
|
||||
var data []byte
|
||||
tx := s.db.NewTransaction(false)
|
||||
defer tx.Discard()
|
||||
|
||||
item, err := tx.Get(key(addr))
|
||||
if err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err = item.ValueCopy(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
121
pkg/local_object_storage/blobstor/badgerstore/iterate.go
Normal file
121
pkg/local_object_storage/blobstor/badgerstore/iterate.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Iterate implements common.Storage.
|
||||
func (s *Store) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
success := false
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Iterate(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var last []byte
|
||||
opts := badger.DefaultIteratorOptions
|
||||
batch := make([]keyValue, 0, opts.PrefetchSize)
|
||||
opts.PrefetchSize++ // to skip last
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.IterateRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
batch = batch[:0]
|
||||
err := s.db.View(func(tx *badger.Txn) error {
|
||||
it := tx.NewIterator(opts)
|
||||
defer it.Close()
|
||||
|
||||
for it.Seek(last); it.Valid(); it.Next() {
|
||||
if bytes.Equal(last, it.Item().Key()) {
|
||||
continue
|
||||
}
|
||||
|
||||
var kv keyValue
|
||||
var err error
|
||||
kv.key = it.Item().KeyCopy(nil)
|
||||
kv.value, err = it.Item().ValueCopy(nil)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
batch = append(batch, kv)
|
||||
last = kv.key
|
||||
if len(batch) == opts.PrefetchSize-1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return common.IterateRes{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
if err := s.iterateBatch(batch, prm); err != nil {
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
}
|
||||
|
||||
success = true
|
||||
return common.IterateRes{}, nil
|
||||
}
|
||||
|
||||
func (s *Store) iterateBatch(batch []keyValue, prm common.IteratePrm) error {
|
||||
for _, kv := range batch {
|
||||
addr, err := address(kv.key)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
}
|
||||
data, err := s.cfg.compression.Decompress(kv.value)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
return fmt.Errorf("could not decompress object data: %w", err)
|
||||
}
|
||||
|
||||
if err := prm.Handler(common.IterationElement{
|
||||
Address: addr,
|
||||
ObjectData: data,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type keyValue struct {
|
||||
key, value []byte
|
||||
}
|
28
pkg/local_object_storage/blobstor/badgerstore/keys.go
Normal file
28
pkg/local_object_storage/blobstor/badgerstore/keys.go
Normal file
|
@ -0,0 +1,28 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func key(add oid.Address) []byte {
|
||||
res := make([]byte, 64)
|
||||
add.Container().Encode(res)
|
||||
add.Object().Encode(res[32:])
|
||||
return res
|
||||
}
|
||||
|
||||
func address(k []byte) (oid.Address, error) {
|
||||
var res oid.Address
|
||||
var containerID cid.ID
|
||||
var objectID oid.ID
|
||||
if err := containerID.Decode(k[:32]); err != nil {
|
||||
return res, err
|
||||
}
|
||||
if err := objectID.Decode(k[32:]); err != nil {
|
||||
return res, err
|
||||
}
|
||||
res.SetContainer(containerID)
|
||||
res.SetObject(objectID)
|
||||
return res, nil
|
||||
}
|
31
pkg/local_object_storage/blobstor/badgerstore/metrics.go
Normal file
31
pkg/local_object_storage/blobstor/badgerstore/metrics.go
Normal file
|
@ -0,0 +1,31 @@
|
|||
package badgerstore
|
||||
|
||||
import "time"
|
||||
|
||||
type Metrics interface {
|
||||
SetParentID(parentID string)
|
||||
|
||||
SetMode(readOnly bool)
|
||||
Close()
|
||||
|
||||
Delete(d time.Duration, success bool)
|
||||
Exists(d time.Duration, success bool)
|
||||
GetRange(d time.Duration, size int, success bool)
|
||||
Get(d time.Duration, size int, success bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
}
|
||||
|
||||
var _ Metrics = (*noopMetrics)(nil)
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (*noopMetrics) Close() {}
|
||||
func (*noopMetrics) Delete(time.Duration, bool) {}
|
||||
func (*noopMetrics) Exists(time.Duration, bool) {}
|
||||
func (*noopMetrics) Get(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) GetRange(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (*noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (*noopMetrics) SetMode(bool) {}
|
||||
func (*noopMetrics) SetParentID(string) {}
|
52
pkg/local_object_storage/blobstor/badgerstore/put.go
Normal file
52
pkg/local_object_storage/blobstor/badgerstore/put.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Put implements common.Storage.
|
||||
func (s *Store) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
success := false
|
||||
size := 0
|
||||
startedAt := time.Now()
|
||||
|
||||
defer func() {
|
||||
s.cfg.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BadgerStore.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", s.cfg.db.Dir),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.Bool("dont_compress", prm.DontCompress),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.readOnly() {
|
||||
return common.PutRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
if !prm.DontCompress {
|
||||
prm.RawData = s.cfg.compression.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
b := s.db.NewWriteBatch()
|
||||
defer b.Cancel()
|
||||
|
||||
err := b.Set(key(prm.Address), prm.RawData)
|
||||
if err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
if err = b.Flush(); err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
success = true
|
||||
size = len(prm.RawData)
|
||||
return common.PutRes{}, nil
|
||||
}
|
68
pkg/local_object_storage/blobstor/badgerstore/store.go
Normal file
68
pkg/local_object_storage/blobstor/badgerstore/store.go
Normal file
|
@ -0,0 +1,68 @@
|
|||
package badgerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"github.com/dgraph-io/badger/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
Type = "badgerstore"
|
||||
)
|
||||
|
||||
var _ common.Storage = (*Store)(nil)
|
||||
|
||||
type Store struct {
|
||||
cfg *cfg
|
||||
db *badger.DB
|
||||
|
||||
modeMtx *sync.Mutex // protects fields in group below
|
||||
opened bool
|
||||
gcCancel context.CancelFunc
|
||||
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// New returns new Store instance with opts applied.
|
||||
func New(opts ...Option) *Store {
|
||||
s := &Store{
|
||||
cfg: defaultCfg(),
|
||||
modeMtx: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(s.cfg)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Compressor implements common.Storage.
|
||||
func (s *Store) Compressor() *compression.Config {
|
||||
return s.cfg.compression
|
||||
}
|
||||
|
||||
// Path implements common.Storage.
|
||||
func (s *Store) Path() string {
|
||||
return s.cfg.db.Dir
|
||||
}
|
||||
|
||||
// SetCompressor implements common.Storage.
|
||||
func (s *Store) SetCompressor(cc *compression.Config) {
|
||||
s.cfg.compression = cc
|
||||
}
|
||||
|
||||
// SetParentID implements common.Storage.
|
||||
func (s *Store) SetParentID(parentID string) {
|
||||
s.cfg.metrics.SetParentID(parentID)
|
||||
}
|
||||
|
||||
// SetReportErrorFunc implements common.Storage.
|
||||
func (*Store) SetReportErrorFunc(func(string, error)) {}
|
||||
|
||||
// Type implements common.Storage.
|
||||
func (*Store) Type() string {
|
||||
return Type
|
||||
}
|
|
@ -50,7 +50,7 @@ func (b *Blobovniczas) Close() error {
|
|||
b.dbCache.Close() // order important
|
||||
b.activeDBManager.Close()
|
||||
b.commondbManager.Close()
|
||||
|
||||
b.metrics.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Put saves object in the maximum weight blobobnicza.
|
||||
// Put saves object in the maximum weight blobovnicza.
|
||||
//
|
||||
// returns error if could not save object in any blobovnicza.
|
||||
func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
|
|
83
pkg/local_object_storage/blobstor/blobtree/blobtree.go
Normal file
83
pkg/local_object_storage/blobstor/blobtree/blobtree.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
var _ common.Storage = &BlobTree{}
|
||||
|
||||
type BlobTree struct {
|
||||
cfg cfg
|
||||
dirLock *utilSync.KeyLocker[string]
|
||||
fileLock *utilSync.KeyLocker[string]
|
||||
compressor *compression.Config
|
||||
dispatcher *rootDispatcher
|
||||
suffix atomic.Uint64
|
||||
}
|
||||
|
||||
func New(opts ...Option) *BlobTree {
|
||||
b := &BlobTree{
|
||||
cfg: cfg{
|
||||
targetFileSizeBytes: 4 * 1024 * 1024,
|
||||
rootPath: "./",
|
||||
depth: 3,
|
||||
permissions: 0700,
|
||||
initWorkersCount: 1000,
|
||||
metrics: &noopMetrics{},
|
||||
},
|
||||
dirLock: utilSync.NewKeyLocker[string](),
|
||||
fileLock: utilSync.NewKeyLocker[string](),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(&b.cfg)
|
||||
}
|
||||
|
||||
b.dispatcher = newRootDispatcher()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *BlobTree) getDir(addr oid.Address) string {
|
||||
sAddr := addr.Object().EncodeToString() + "." + addr.Container().EncodeToString()
|
||||
var sb strings.Builder
|
||||
size := int(b.cfg.depth * (directoryLength + 1)) // (character + slash for every level)
|
||||
sb.Grow(size)
|
||||
|
||||
for i := uint64(0); i < b.cfg.depth; i++ {
|
||||
if i > 0 {
|
||||
sb.WriteRune(filepath.Separator)
|
||||
}
|
||||
sb.WriteString(sAddr[:directoryLength])
|
||||
sAddr = sAddr[directoryLength:]
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (b *BlobTree) createDir(dir string, isSystemPath bool) error {
|
||||
b.dirLock.Lock(dir)
|
||||
defer b.dirLock.Unlock(dir)
|
||||
|
||||
if !isSystemPath {
|
||||
dir = b.getSystemPath(dir)
|
||||
}
|
||||
if err := util.MkdirAllX(dir, b.cfg.permissions); err != nil {
|
||||
if errors.Is(err, syscall.ENOSPC) {
|
||||
err = common.ErrNoSpace
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
15
pkg/local_object_storage/blobstor/blobtree/config.go
Normal file
15
pkg/local_object_storage/blobstor/blobtree/config.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package blobtree
|
||||
|
||||
import "io/fs"
|
||||
|
||||
var directoryLength uint64 = 1
|
||||
|
||||
type cfg struct {
|
||||
rootPath string
|
||||
depth uint64
|
||||
targetFileSizeBytes uint64
|
||||
permissions fs.FileMode
|
||||
readOnly bool
|
||||
initWorkersCount int
|
||||
metrics Metrics
|
||||
}
|
209
pkg/local_object_storage/blobstor/blobtree/content.go
Normal file
209
pkg/local_object_storage/blobstor/blobtree/content.go
Normal file
|
@ -0,0 +1,209 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultVersion = 0
|
||||
|
||||
sizeOfVersion = 1
|
||||
sizeOfCount = 8
|
||||
sizeOfDataLength = 8
|
||||
sizeOfContainerID = sha256.Size
|
||||
sizeOfObjectID = sha256.Size
|
||||
|
||||
dataExtension = ".data"
|
||||
)
|
||||
|
||||
var (
|
||||
errFileToSmall = errors.New("invalid file content: not enough bytes to read count of records")
|
||||
errInvalidFileContentVersion = errors.New("invalid file content: not enough bytes to read record version")
|
||||
errInvalidFileContentContainerID = errors.New("invalid file content: not enough bytes to read container ID")
|
||||
errInvalidFileContentObjectID = errors.New("invalid file content: not enough bytes to read object ID")
|
||||
errInvalidFileContentLength = errors.New("invalid file content: not enough bytes to read data length")
|
||||
errInvalidFileContentData = errors.New("invalid file content: not enough bytes to read data")
|
||||
)
|
||||
|
||||
type objectData struct {
|
||||
Version byte
|
||||
Address oid.Address
|
||||
Data []byte
|
||||
}
|
||||
|
||||
func (b *BlobTree) readFileContent(path string) ([]objectData, error) {
|
||||
rawData, err := os.ReadFile(b.getSystemPath(path))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return []objectData{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return b.unmarshalSlice(rawData)
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalSlice(data []byte) ([]objectData, error) {
|
||||
if len(data) < sizeOfCount {
|
||||
return nil, errFileToSmall
|
||||
}
|
||||
count := binary.LittleEndian.Uint64(data[:8])
|
||||
result := make([]objectData, 0, count)
|
||||
|
||||
data = data[sizeOfCount:]
|
||||
var idx uint64
|
||||
for idx = 0; idx < count; idx++ {
|
||||
record, read, err := b.unmarshalRecord(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, record)
|
||||
data = data[read:]
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalRecord(data []byte) (objectData, uint64, error) {
|
||||
if len(data) < sizeOfVersion {
|
||||
return objectData{}, 0, errInvalidFileContentVersion
|
||||
}
|
||||
var result objectData
|
||||
var read uint64
|
||||
result.Version = data[0]
|
||||
if result.Version != defaultVersion {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: unknown version %d", result.Version)
|
||||
}
|
||||
read += sizeOfVersion
|
||||
|
||||
if len(data[read:]) < sizeOfContainerID {
|
||||
return objectData{}, 0, errInvalidFileContentContainerID
|
||||
}
|
||||
var contID cid.ID
|
||||
if err := contID.Decode(data[read : read+sizeOfContainerID]); err != nil {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read container ID: %w", err)
|
||||
}
|
||||
read += sizeOfContainerID
|
||||
|
||||
if len(data[read:]) < sizeOfObjectID {
|
||||
return objectData{}, 0, errInvalidFileContentObjectID
|
||||
}
|
||||
var objID oid.ID
|
||||
if err := objID.Decode(data[read : read+sizeOfObjectID]); err != nil {
|
||||
return objectData{}, 0, fmt.Errorf("invalid file content: failed to read object ID: %w", err)
|
||||
}
|
||||
read += sizeOfObjectID
|
||||
|
||||
result.Address.SetContainer(contID)
|
||||
result.Address.SetObject(objID)
|
||||
|
||||
if len(data[read:]) < sizeOfDataLength {
|
||||
return objectData{}, 0, errInvalidFileContentLength
|
||||
}
|
||||
dataLength := binary.LittleEndian.Uint64(data[read : read+sizeOfDataLength])
|
||||
read += sizeOfDataLength
|
||||
|
||||
if uint64(len(data[read:])) < dataLength {
|
||||
return objectData{}, 0, errInvalidFileContentData
|
||||
}
|
||||
result.Data = make([]byte, dataLength)
|
||||
copy(result.Data, data[read:read+dataLength])
|
||||
read += dataLength
|
||||
|
||||
return result, read, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) saveContentToFile(records []objectData, path string) (uint64, error) {
|
||||
data, err := b.marshalSlice(records)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint64(len(data)), b.writeFile(path, data)
|
||||
}
|
||||
|
||||
func (b *BlobTree) writeFile(p string, data []byte) error {
|
||||
p = b.getSystemPath(p)
|
||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL|os.O_SYNC, b.cfg.permissions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write(data)
|
||||
if err1 := f.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *BlobTree) getSystemPath(path string) string {
|
||||
return filepath.Join(b.cfg.rootPath, path)
|
||||
}
|
||||
|
||||
func (b *BlobTree) marshalSlice(records []objectData) ([]byte, error) {
|
||||
buf := make([]byte, b.estimateSize(records))
|
||||
result := buf
|
||||
binary.LittleEndian.PutUint64(buf, uint64(len(records)))
|
||||
buf = buf[sizeOfCount:]
|
||||
for _, record := range records {
|
||||
written := b.marshalRecord(record, buf)
|
||||
buf = buf[written:]
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) marshalRecord(record objectData, dst []byte) uint64 {
|
||||
var written uint64
|
||||
|
||||
dst[0] = record.Version
|
||||
dst = dst[sizeOfVersion:]
|
||||
written += sizeOfVersion
|
||||
|
||||
record.Address.Container().Encode(dst)
|
||||
dst = dst[sizeOfContainerID:]
|
||||
written += sizeOfContainerID
|
||||
|
||||
record.Address.Object().Encode(dst)
|
||||
dst = dst[sizeOfObjectID:]
|
||||
written += sizeOfObjectID
|
||||
|
||||
binary.LittleEndian.PutUint64(dst, uint64(len(record.Data)))
|
||||
dst = dst[sizeOfDataLength:]
|
||||
written += sizeOfDataLength
|
||||
|
||||
copy(dst, record.Data)
|
||||
written += uint64(len(record.Data))
|
||||
|
||||
return written
|
||||
}
|
||||
|
||||
func (b *BlobTree) estimateSize(records []objectData) uint64 {
|
||||
var result uint64
|
||||
result += sizeOfCount
|
||||
for _, record := range records {
|
||||
result += (sizeOfVersion + sizeOfContainerID + sizeOfObjectID + sizeOfDataLength)
|
||||
result += uint64(len(record.Data))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (b *BlobTree) getFilePath(dir string, idx uint64) string {
|
||||
return filepath.Join(dir, strconv.FormatUint(idx, 16)+dataExtension)
|
||||
}
|
||||
|
||||
func (b *BlobTree) parsePath(path string) (string, uint64, error) {
|
||||
dir := filepath.Dir(path)
|
||||
fileName := strings.TrimSuffix(filepath.Base(path), dataExtension)
|
||||
idx, err := strconv.ParseUint(fileName, 16, 64)
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("failed to parse blobtree path: %w", err)
|
||||
}
|
||||
return dir, idx, nil
|
||||
}
|
102
pkg/local_object_storage/blobstor/blobtree/control.go
Normal file
102
pkg/local_object_storage/blobstor/blobtree/control.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var Type = "blobtree"
|
||||
|
||||
func (b *BlobTree) Open(readOnly bool) error {
|
||||
b.cfg.readOnly = readOnly
|
||||
b.cfg.metrics.SetMode(readOnly)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) Init() error {
|
||||
if err := b.createDir(b.cfg.rootPath, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.SetLimit(b.cfg.initWorkersCount)
|
||||
eg.Go(func() error {
|
||||
return b.initDir(&eg, "")
|
||||
})
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (b *BlobTree) initDir(eg *errgroup.Group, dir string) error {
|
||||
entities, err := os.ReadDir(b.getSystemPath(dir))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
if entity.IsDir() {
|
||||
eg.Go(func() error {
|
||||
return b.initDir(eg, filepath.Join(dir, entity.Name()))
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Join(dir, entity.Name())
|
||||
|
||||
if b.isTempFile(entity.Name()) {
|
||||
if err = os.Remove(b.getSystemPath(path)); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b.dispatcher.Init(dir, idx)
|
||||
b.cfg.metrics.IncFilesCount()
|
||||
|
||||
stat, err := os.Stat(b.getSystemPath(path))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if stat.Size() < int64(b.cfg.targetFileSizeBytes) {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) isTempFile(name string) bool {
|
||||
return strings.Contains(name, tempFileSymbols)
|
||||
}
|
||||
|
||||
func (b *BlobTree) parseIdx(name string) (uint64, error) {
|
||||
return strconv.ParseUint(strings.TrimSuffix(name, dataExtension), 16, 64)
|
||||
}
|
||||
|
||||
func (b *BlobTree) Close() error {
|
||||
b.cfg.metrics.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) Type() string { return Type }
|
||||
func (b *BlobTree) Path() string { return b.cfg.rootPath }
|
||||
|
||||
func (b *BlobTree) SetCompressor(cc *compression.Config) {
|
||||
b.compressor = cc
|
||||
}
|
||||
|
||||
func (b *BlobTree) Compressor() *compression.Config {
|
||||
return b.compressor
|
||||
}
|
||||
|
||||
func (b *BlobTree) SetReportErrorFunc(_ func(string, error)) {}
|
||||
|
||||
func (b *BlobTree) SetParentID(parentID string) {
|
||||
b.cfg.metrics.SetParentID(parentID)
|
||||
}
|
151
pkg/local_object_storage/blobstor/blobtree/delete.go
Normal file
151
pkg/local_object_storage/blobstor/blobtree/delete.go
Normal file
|
@ -0,0 +1,151 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Delete(ctx context.Context, prm common.DeletePrm) (common.DeleteRes, error) {
|
||||
var (
|
||||
success = false
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.Delete(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Delete",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", string(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if b.cfg.readOnly {
|
||||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
var res common.DeleteRes
|
||||
var err error
|
||||
if path, ok := getPathFromStorageID(prm.StorageID); ok {
|
||||
res, err = b.deleteFromPath(prm.Address, path)
|
||||
} else {
|
||||
res, err = b.findAndDelete(prm.Address)
|
||||
}
|
||||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *BlobTree) deleteFromPath(addr oid.Address, path string) (common.DeleteRes, error) {
|
||||
b.fileLock.Lock(path)
|
||||
defer b.fileLock.Unlock(path)
|
||||
|
||||
dir, idx, err := b.parsePath(path)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
deleteIdx := -1
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
deleteIdx = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if deleteIdx == -1 {
|
||||
return common.DeleteRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
if len(records) == 1 {
|
||||
err = os.Remove(b.getSystemPath(path))
|
||||
if err == nil {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
b.cfg.metrics.DecFilesCount()
|
||||
}
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
|
||||
records = append(records[:deleteIdx], records[deleteIdx+1:]...)
|
||||
size, err := b.writeToTmpAndRename(records, path)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
if size < b.cfg.targetFileSizeBytes {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
return common.DeleteRes{}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndDelete(addr oid.Address) (common.DeleteRes, error) {
|
||||
dir := b.getDir(addr)
|
||||
idx, err := b.findFileIdx(dir, addr)
|
||||
if err != nil {
|
||||
return common.DeleteRes{}, err
|
||||
}
|
||||
return b.deleteFromPath(addr, b.getFilePath(dir, idx))
|
||||
}
|
||||
|
||||
func (b *BlobTree) findFileIdx(dir string, addr oid.Address) (uint64, error) {
|
||||
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
if entity.IsDir() {
|
||||
continue
|
||||
}
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
idx, err := b.parseIdx(entity.Name())
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
path := b.getFilePath(dir, idx)
|
||||
contains, err := b.fileContainsObject(path, addr)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if contains {
|
||||
return idx, nil
|
||||
}
|
||||
}
|
||||
return 0, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) fileContainsObject(path string, addr oid.Address) (bool, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
94
pkg/local_object_storage/blobstor/blobtree/dispatcher.go
Normal file
94
pkg/local_object_storage/blobstor/blobtree/dispatcher.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type rootDispatcher struct {
|
||||
dispatchers map[string]*dirDispatcher
|
||||
guard sync.Mutex
|
||||
}
|
||||
|
||||
func newRootDispatcher() *rootDispatcher {
|
||||
return &rootDispatcher{
|
||||
dispatchers: make(map[string]*dirDispatcher),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) GetIdxForWrite(dir string) uint64 {
|
||||
return d.getDirDispatcher(dir).GetIdxForWrite()
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) ReturnIdx(dir string, idx uint64) {
|
||||
d.getDirDispatcher(dir).ReturnIdx(idx)
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) Init(dir string, idx uint64) {
|
||||
d.getDirDispatcher(dir).Init(idx)
|
||||
}
|
||||
|
||||
func (d *rootDispatcher) getDirDispatcher(dir string) *dirDispatcher {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
if result, ok := d.dispatchers[dir]; ok {
|
||||
return result
|
||||
}
|
||||
|
||||
result := newDirDispatcher(dir)
|
||||
d.dispatchers[dir] = result
|
||||
return result
|
||||
}
|
||||
|
||||
type dirDispatcher struct {
|
||||
dir string
|
||||
guard sync.Mutex
|
||||
indicies map[uint64]struct{}
|
||||
nextIndex uint64
|
||||
}
|
||||
|
||||
func newDirDispatcher(dir string) *dirDispatcher {
|
||||
return &dirDispatcher{
|
||||
dir: dir,
|
||||
indicies: make(map[uint64]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) GetIdxForWrite() uint64 {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
var result uint64
|
||||
var found bool
|
||||
|
||||
for idx := range d.indicies {
|
||||
result = idx
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
if found {
|
||||
delete(d.indicies, result)
|
||||
return result
|
||||
}
|
||||
|
||||
result = d.nextIndex
|
||||
d.nextIndex++
|
||||
return result
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) ReturnIdx(idx uint64) {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
d.indicies[idx] = struct{}{}
|
||||
}
|
||||
|
||||
func (d *dirDispatcher) Init(idx uint64) {
|
||||
d.guard.Lock()
|
||||
defer d.guard.Unlock()
|
||||
|
||||
if d.nextIndex <= idx {
|
||||
d.nextIndex = idx + 1
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDispatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d := newRootDispatcher()
|
||||
idx := d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(0), idx)
|
||||
d.ReturnIdx("/dir1", idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(0), idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir1")
|
||||
require.Equal(t, uint64(1), idx)
|
||||
|
||||
d.Init("/dir2", 5)
|
||||
idx = d.GetIdxForWrite("/dir2")
|
||||
require.Equal(t, uint64(6), idx)
|
||||
|
||||
idx = d.GetIdxForWrite("/dir2")
|
||||
require.Equal(t, uint64(7), idx)
|
||||
}
|
76
pkg/local_object_storage/blobstor/blobtree/exists.go
Normal file
76
pkg/local_object_storage/blobstor/blobtree/exists.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Exists(ctx context.Context, prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.Exists(time.Since(startedAt), success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Exists",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", string(prm.StorageID)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var res common.ExistsRes
|
||||
var err error
|
||||
if path, ok := getPathFromStorageID(prm.StorageID); ok {
|
||||
res, err = b.existsFromPath(prm.Address, path)
|
||||
} else {
|
||||
res, err = b.findAndCheckExistance(prm.Address)
|
||||
}
|
||||
success = err == nil
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *BlobTree) existsFromPath(addr oid.Address, path string) (common.ExistsRes, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.ExistsRes{}, err
|
||||
}
|
||||
|
||||
for i := range records {
|
||||
if records[i].Address.Equals(addr) {
|
||||
return common.ExistsRes{
|
||||
Exists: true,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return common.ExistsRes{}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndCheckExistance(addr oid.Address) (common.ExistsRes, error) {
|
||||
dir := b.getDir(addr)
|
||||
_, err := b.findFileIdx(dir, addr)
|
||||
if err == nil {
|
||||
return common.ExistsRes{Exists: true}, nil
|
||||
}
|
||||
|
||||
var notFound *apistatus.ObjectNotFound
|
||||
if errors.As(err, ¬Found) {
|
||||
return common.ExistsRes{}, nil
|
||||
}
|
||||
return common.ExistsRes{}, err
|
||||
}
|
39
pkg/local_object_storage/blobstor/blobtree/generic_test.go
Normal file
39
pkg/local_object_storage/blobstor/blobtree/generic_test.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
)
|
||||
|
||||
func TestGeneric(t *testing.T) {
|
||||
newTreeFromPath := func(path string) common.Storage {
|
||||
return New(
|
||||
WithPath(path),
|
||||
WithDepth(2))
|
||||
}
|
||||
|
||||
newTree := func(t *testing.T) common.Storage {
|
||||
return newTreeFromPath(t.TempDir())
|
||||
}
|
||||
|
||||
blobstortest.TestAll(t, newTree, 2048, 16*1024)
|
||||
|
||||
t.Run("info", func(t *testing.T) {
|
||||
path := t.TempDir()
|
||||
blobstortest.TestInfo(t, func(*testing.T) common.Storage {
|
||||
return newTreeFromPath(path)
|
||||
}, Type, path)
|
||||
})
|
||||
}
|
||||
|
||||
func TestControl(t *testing.T) {
|
||||
newTree := func(t *testing.T) common.Storage {
|
||||
return New(
|
||||
WithPath(t.TempDir()),
|
||||
WithDepth(2))
|
||||
}
|
||||
|
||||
blobstortest.TestControl(t, newTree, 2048, 2048)
|
||||
}
|
129
pkg/local_object_storage/blobstor/blobtree/get.go
Normal file
129
pkg/local_object_storage/blobstor/blobtree/get.go
Normal file
|
@ -0,0 +1,129 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.Get(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Get",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", string(prm.StorageID)),
|
||||
attribute.Bool("raw", prm.Raw),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
res, err := b.get(prm)
|
||||
success = err == nil
|
||||
size = len(res.RawData)
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *BlobTree) get(prm common.GetPrm) (common.GetRes, error) {
|
||||
if path, ok := getPathFromStorageID(prm.StorageID); ok {
|
||||
return b.getFromPath(prm.Address, path)
|
||||
}
|
||||
return b.findAndGet(prm.Address)
|
||||
}
|
||||
|
||||
func (b *BlobTree) getFromPath(addr oid.Address, path string) (common.GetRes, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
if record.Address.Equals(addr) {
|
||||
return b.unmarshalGetRes(record)
|
||||
}
|
||||
}
|
||||
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) unmarshalGetRes(record objectData) (common.GetRes, error) {
|
||||
data, err := b.compressor.Decompress(record.Data)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
return common.GetRes{Object: obj, RawData: data}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) findAndGet(addr oid.Address) (common.GetRes, error) {
|
||||
dir := b.getDir(addr)
|
||||
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
for _, entity := range entities {
|
||||
|
||||
if entity.IsDir() {
|
||||
continue
|
||||
}
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
path := filepath.Join(dir, entity.Name())
|
||||
res, err := b.tryReadObject(path, addr)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
if res.Object != nil {
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
func (b *BlobTree) tryReadObject(path string, addr oid.Address) (common.GetRes, error) {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
if record.Address.Equals(addr) {
|
||||
res, err := b.unmarshalGetRes(record)
|
||||
if err != nil {
|
||||
return common.GetRes{}, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
}
|
||||
return common.GetRes{}, nil
|
||||
}
|
55
pkg/local_object_storage/blobstor/blobtree/get_range.go
Normal file
55
pkg/local_object_storage/blobstor/blobtree/get_range.go
Normal file
|
@ -0,0 +1,55 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.GetRangeRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
size = 0
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.GetRange(time.Since(startedAt), size, success, prm.StorageID != nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.GetRange",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.String("storage_id", string(prm.StorageID)),
|
||||
attribute.String("offset", strconv.FormatUint(prm.Range.GetOffset(), 10)),
|
||||
attribute.String("length", strconv.FormatUint(prm.Range.GetLength(), 10)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
gRes, err := b.get(common.GetPrm{Address: prm.Address, StorageID: prm.StorageID})
|
||||
if err != nil {
|
||||
return common.GetRangeRes{}, err
|
||||
}
|
||||
|
||||
payload := gRes.Object.Payload()
|
||||
from := prm.Range.GetOffset()
|
||||
to := from + prm.Range.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return common.GetRangeRes{}, logicerr.Wrap(new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
res := common.GetRangeRes{
|
||||
Data: payload[from:to],
|
||||
}
|
||||
size = len(res.Data)
|
||||
success = true
|
||||
return res, nil
|
||||
}
|
97
pkg/local_object_storage/blobstor/blobtree/iterate.go
Normal file
97
pkg/local_object_storage/blobstor/blobtree/iterate.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
err error
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.Iterate(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Iterate",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", b.cfg.rootPath),
|
||||
attribute.Bool("ignore_errors", prm.IgnoreErrors),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
err = b.iterateDir("", prm)
|
||||
return common.IterateRes{}, err
|
||||
}
|
||||
|
||||
func (b *BlobTree) iterateDir(dir string, prm common.IteratePrm) error {
|
||||
entities, err := os.ReadDir(filepath.Join(b.cfg.rootPath, dir))
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, entity := range entities {
|
||||
if entity.IsDir() {
|
||||
err := b.iterateDir(filepath.Join(dir, entity.Name()), prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if b.isTempFile(entity.Name()) {
|
||||
continue
|
||||
}
|
||||
|
||||
path := filepath.Join(dir, entity.Name())
|
||||
err = b.iterateRecords(path, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) iterateRecords(path string, prm common.IteratePrm) error {
|
||||
b.fileLock.RLock(path)
|
||||
defer b.fileLock.RUnlock(path)
|
||||
|
||||
records, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, record := range records {
|
||||
record.Data, err = b.compressor.Decompress(record.Data)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = prm.Handler(common.IterationElement{
|
||||
Address: record.Address,
|
||||
ObjectData: record.Data,
|
||||
StorageID: pathToStorageID(path),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
34
pkg/local_object_storage/blobstor/blobtree/metrics.go
Normal file
34
pkg/local_object_storage/blobstor/blobtree/metrics.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
package blobtree
|
||||
|
||||
import "time"
|
||||
|
||||
type Metrics interface {
|
||||
SetParentID(parentID string)
|
||||
|
||||
SetMode(readOnly bool)
|
||||
Close()
|
||||
|
||||
Delete(d time.Duration, success, withStorageID bool)
|
||||
Exists(d time.Duration, success, withStorageID bool)
|
||||
GetRange(d time.Duration, size int, success, withStorageID bool)
|
||||
Get(d time.Duration, size int, success, withStorageID bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
|
||||
IncFilesCount()
|
||||
DecFilesCount()
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (m *noopMetrics) SetParentID(string) {}
|
||||
func (m *noopMetrics) SetMode(bool) {}
|
||||
func (m *noopMetrics) Close() {}
|
||||
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
|
||||
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) IncFilesCount() {}
|
||||
func (m *noopMetrics) DecFilesCount() {}
|
35
pkg/local_object_storage/blobstor/blobtree/option.go
Normal file
35
pkg/local_object_storage/blobstor/blobtree/option.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package blobtree
|
||||
|
||||
import "io/fs"
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
func WithPath(path string) Option {
|
||||
return func(c *cfg) {
|
||||
c.rootPath = path
|
||||
}
|
||||
}
|
||||
|
||||
func WithDepth(depth uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.depth = depth
|
||||
}
|
||||
}
|
||||
|
||||
func WithPerm(p fs.FileMode) Option {
|
||||
return func(c *cfg) {
|
||||
c.permissions = p
|
||||
}
|
||||
}
|
||||
|
||||
func WithTargetSize(size uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.targetFileSizeBytes = size
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
121
pkg/local_object_storage/blobstor/blobtree/put.go
Normal file
121
pkg/local_object_storage/blobstor/blobtree/put.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
tempFileSymbols = "###"
|
||||
)
|
||||
|
||||
func (b *BlobTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, error) {
|
||||
var (
|
||||
success bool
|
||||
size int
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.cfg.metrics.Put(time.Since(startedAt), size, success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "BlobTree.Put",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.Address.EncodeToString()),
|
||||
attribute.Bool("dont_compress", prm.DontCompress),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if b.cfg.readOnly {
|
||||
return common.PutRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
dir := b.getDir(prm.Address)
|
||||
|
||||
if err := b.createDir(dir, false); err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
if !prm.DontCompress {
|
||||
prm.RawData = b.compressor.Compress(prm.RawData)
|
||||
}
|
||||
|
||||
path, err := b.saveToLocalDir(prm, dir)
|
||||
if err != nil {
|
||||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
success = true
|
||||
size = len(prm.RawData)
|
||||
|
||||
return common.PutRes{StorageID: pathToStorageID(path)}, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) saveToLocalDir(prm common.PutPrm, dir string) (string, error) {
|
||||
returnIdx := true
|
||||
idx := b.dispatcher.GetIdxForWrite(dir)
|
||||
path := b.getFilePath(dir, idx)
|
||||
|
||||
b.fileLock.Lock(path)
|
||||
defer b.fileLock.Unlock(path)
|
||||
|
||||
defer func() {
|
||||
if returnIdx {
|
||||
b.dispatcher.ReturnIdx(dir, idx)
|
||||
}
|
||||
}()
|
||||
|
||||
currentContent, err := b.readFileContent(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var newRecord objectData
|
||||
newRecord.Address = prm.Address
|
||||
newRecord.Data = prm.RawData
|
||||
|
||||
size, err := b.writeToTmpAndRename(append(currentContent, newRecord), path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
returnIdx = size < b.cfg.targetFileSizeBytes
|
||||
|
||||
return path, nil
|
||||
}
|
||||
|
||||
func (b *BlobTree) writeToTmpAndRename(records []objectData, path string) (uint64, error) {
|
||||
tmpPath := path + tempFileSymbols + strconv.FormatUint(b.suffix.Add(1), 16)
|
||||
|
||||
size, err := b.saveContentToFile(records, tmpPath)
|
||||
if err != nil {
|
||||
_ = os.Remove(b.getSystemPath(tmpPath))
|
||||
return 0, err
|
||||
}
|
||||
|
||||
newFile := false
|
||||
_, err = os.Stat(b.getSystemPath(path))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
newFile = true
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.Rename(b.getSystemPath(tmpPath), b.getSystemPath(path)); err != nil {
|
||||
_ = os.Remove(b.getSystemPath(tmpPath))
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if newFile {
|
||||
b.cfg.metrics.IncFilesCount()
|
||||
}
|
||||
|
||||
return size, nil
|
||||
}
|
15
pkg/local_object_storage/blobstor/blobtree/storage_id.go
Normal file
15
pkg/local_object_storage/blobstor/blobtree/storage_id.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package blobtree
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
const storageIDPrefix = "blobtree:"
|
||||
|
||||
func getPathFromStorageID(storageID []byte) (string, bool) {
|
||||
return strings.CutPrefix(string(storageID), storageIDPrefix)
|
||||
}
|
||||
|
||||
func pathToStorageID(path string) []byte {
|
||||
return []byte(storageIDPrefix + path)
|
||||
}
|
|
@ -5,7 +5,9 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/memstore"
|
||||
|
@ -81,6 +83,23 @@ var storages = []storage{
|
|||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "blobtree",
|
||||
create: func(dir string) common.Storage {
|
||||
return blobtree.New(
|
||||
blobtree.WithDepth(2),
|
||||
blobtree.WithPath(dir),
|
||||
)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "badger",
|
||||
create: func(dir string) common.Storage {
|
||||
return badgerstore.New(
|
||||
badgerstore.WithPath(dir),
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func BenchmarkSubstorageReadPerf(b *testing.B) {
|
||||
|
|
74
pkg/local_object_storage/metrics/badgerstore.go
Normal file
74
pkg/local_object_storage/metrics/badgerstore.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/badgerstore"
|
||||
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
func NewBadgerStoreMetrics(path string, m metrics_impl.BadgerStoreMetrics) badgerstore.Metrics {
|
||||
return &badgerStoreMetrics{
|
||||
path: path,
|
||||
m: m,
|
||||
}
|
||||
}
|
||||
|
||||
type badgerStoreMetrics struct {
|
||||
path, shardID string
|
||||
m metrics_impl.BadgerStoreMetrics
|
||||
}
|
||||
|
||||
// Close implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Close() {
|
||||
m.m.Close(m.shardID, m.path)
|
||||
}
|
||||
|
||||
// Delete implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Delete(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Delete", d, success)
|
||||
}
|
||||
|
||||
// Exists implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Exists(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Exists", d, success)
|
||||
}
|
||||
|
||||
// Get implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Get(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Get", d, success)
|
||||
if success {
|
||||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// GetRange implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) GetRange(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "GetRange", d, success)
|
||||
if success {
|
||||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Iterate(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Iterate", d, success)
|
||||
}
|
||||
|
||||
// Put implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) Put(d time.Duration, size int, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "Put", d, success)
|
||||
if success {
|
||||
m.m.AddPut(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
// SetMode implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) SetMode(readOnly bool) {
|
||||
m.m.SetMode(m.shardID, m.path, readOnly)
|
||||
}
|
||||
|
||||
// SetParentID implements badgerstore.Metrics.
|
||||
func (m *badgerStoreMetrics) SetParentID(parentID string) {
|
||||
m.shardID = parentID
|
||||
}
|
|
@ -8,7 +8,7 @@ import (
|
|||
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics) blobovniczatree.Metrics {
|
||||
func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobovniczaMetrics) blobovniczatree.Metrics {
|
||||
return &blobovniczaTreeMetrics{
|
||||
path: path,
|
||||
shardID: undefined,
|
||||
|
@ -19,7 +19,7 @@ func NewBlobovniczaTreeMetrics(path string, m metrics_impl.BlobobvnizcaMetrics)
|
|||
type blobovniczaTreeMetrics struct {
|
||||
shardID string
|
||||
path string
|
||||
m metrics_impl.BlobobvnizcaMetrics
|
||||
m metrics_impl.BlobovniczaMetrics
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Blobovnicza() blobovnicza.Metrics {
|
||||
|
@ -35,48 +35,48 @@ func (m *blobovniczaTreeMetrics) SetParentID(parentID string) {
|
|||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) SetMode(readOnly bool) {
|
||||
m.m.SetBlobobvnizcaTreeMode(m.shardID, m.path, readOnly)
|
||||
m.m.SetBlobovniczaTreeMode(m.shardID, m.path, readOnly)
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Close() {
|
||||
m.m.CloseBlobobvnizcaTree(m.shardID, m.path)
|
||||
m.m.CloseBlobovniczaTree(m.shardID, m.path)
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Exists(d time.Duration, success, withStorageID bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
if success {
|
||||
m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size)
|
||||
m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
if success {
|
||||
m.m.AddBlobobvnizcaTreeGet(m.shardID, m.path, size)
|
||||
m.m.AddBlobovniczaTreeGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Iterate(d time.Duration, success bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
|
||||
m.m.BlobovniczaTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
|
||||
if success {
|
||||
m.m.AddBlobobvnizcaTreePut(m.shardID, m.path, size)
|
||||
m.m.AddBlobovniczaTreePut(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
type blobovniczaMetrics struct {
|
||||
m metrics_impl.BlobobvnizcaMetrics
|
||||
m metrics_impl.BlobovniczaMetrics
|
||||
shardID func() string
|
||||
path string
|
||||
}
|
||||
|
|
74
pkg/local_object_storage/metrics/blobtree.go
Normal file
74
pkg/local_object_storage/metrics/blobtree.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobtree"
|
||||
metrics_impl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
func NewBlobTreeMetrics(path string, m metrics_impl.BlobTreeMetrics) blobtree.Metrics {
|
||||
return &blobTreeMetrics{
|
||||
path: path,
|
||||
m: m,
|
||||
}
|
||||
}
|
||||
|
||||
type blobTreeMetrics struct {
|
||||
shardID string
|
||||
path string
|
||||
m metrics_impl.BlobTreeMetrics
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) SetParentID(parentID string) {
|
||||
m.shardID = parentID
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) SetMode(readOnly bool) {
|
||||
m.m.SetBlobTreeMode(m.shardID, m.path, readOnly)
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Close() {
|
||||
m.m.CloseBlobTree(m.shardID, m.path)
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Exists(d time.Duration, success, withStorageID bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Exists", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) GetRange(d time.Duration, size int, success, withStorageID bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "GetRange", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
if success {
|
||||
m.m.AddBlobTreeGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Get(d time.Duration, size int, success, withStorageID bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Get", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
|
||||
if success {
|
||||
m.m.AddBlobTreeGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Iterate(d time.Duration, success bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Iterate", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) Put(d time.Duration, size int, success bool) {
|
||||
m.m.BlobTreeMethodDuration(m.shardID, m.path, "Put", d, success, metrics_impl.NullBool{})
|
||||
if success {
|
||||
m.m.AddBlobTreePut(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) IncFilesCount() {
|
||||
m.m.IncBlobTreeFilesCount(m.shardID, m.path)
|
||||
}
|
||||
|
||||
func (m *blobTreeMetrics) DecFilesCount() {
|
||||
m.m.DecBlobTreeFilesCount(m.shardID, m.path)
|
||||
}
|
98
pkg/metrics/badgerstore.go
Normal file
98
pkg/metrics/badgerstore.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type BadgerStoreMetrics interface {
|
||||
SetMode(shardID, path string, readOnly bool)
|
||||
Close(shardID, path string)
|
||||
MethodDuration(shardID, path string, method string, d time.Duration, success bool)
|
||||
AddPut(shardID, path string, size int)
|
||||
AddGet(shardID, path string, size int)
|
||||
}
|
||||
|
||||
var _ BadgerStoreMetrics = (*badgerStoreMetrics)(nil)
|
||||
|
||||
type badgerStoreMetrics struct {
|
||||
mode *shardIDPathModeValue
|
||||
reqDuration *prometheus.HistogramVec
|
||||
put *prometheus.CounterVec
|
||||
get *prometheus.CounterVec
|
||||
}
|
||||
|
||||
func newbadgerStoreMetrics() *badgerStoreMetrics {
|
||||
return &badgerStoreMetrics{
|
||||
mode: newShardIDPathMode(badgerStoreSubSystem, "mode", "BadgerStore mode"),
|
||||
reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Accumulated BadgerStore request process duration",
|
||||
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel}),
|
||||
put: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "put_bytes",
|
||||
Help: "Accumulated payload size written to BadgerStore",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
get: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: badgerStoreSubSystem,
|
||||
Name: "get_bytes",
|
||||
Help: "Accumulated payload size read from BadgerStore",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
// AddGet implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) AddGet(shardID string, path string, size int) {
|
||||
b.get.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
// AddPut implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) AddPut(shardID string, path string, size int) {
|
||||
b.put.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
// Close implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) Close(shardID string, path string) {
|
||||
b.mode.SetMode(shardID, path, closedMode)
|
||||
b.reqDuration.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.get.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.put.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
}
|
||||
|
||||
// MethodDuration implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) MethodDuration(shardID string, path string, method string, d time.Duration, success bool) {
|
||||
b.reqDuration.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
methodLabel: method,
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
// SetMode implements BadgerStoreMetrics.
|
||||
func (b *badgerStoreMetrics) SetMode(shardID string, path string, readOnly bool) {
|
||||
b.mode.SetMode(shardID, path, modeFromBool(readOnly))
|
||||
}
|
|
@ -8,12 +8,12 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type BlobobvnizcaMetrics interface {
|
||||
SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool)
|
||||
CloseBlobobvnizcaTree(shardID, path string)
|
||||
BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
|
||||
AddBlobobvnizcaTreePut(shardID, path string, size int)
|
||||
AddBlobobvnizcaTreeGet(shardID, path string, size int)
|
||||
type BlobovniczaMetrics interface {
|
||||
SetBlobovniczaTreeMode(shardID, path string, readOnly bool)
|
||||
CloseBlobovniczaTree(shardID, path string)
|
||||
BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
|
||||
AddBlobovniczaTreePut(shardID, path string, size int)
|
||||
AddBlobovniczaTreeGet(shardID, path string, size int)
|
||||
|
||||
AddOpenBlobovniczaSize(shardID, path string, size uint64)
|
||||
SubOpenBlobovniczaSize(shardID, path string, size uint64)
|
||||
|
@ -78,11 +78,11 @@ func newBlobovnicza() *blobovnicza {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *blobovnicza) SetBlobobvnizcaTreeMode(shardID, path string, readOnly bool) {
|
||||
func (b *blobovnicza) SetBlobovniczaTreeMode(shardID, path string, readOnly bool) {
|
||||
b.treeMode.SetMode(shardID, path, modeFromBool(readOnly))
|
||||
}
|
||||
|
||||
func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
|
||||
func (b *blobovnicza) CloseBlobovniczaTree(shardID, path string) {
|
||||
b.treeMode.SetMode(shardID, path, closedMode)
|
||||
b.treeReqDuration.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
|
@ -96,9 +96,21 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
|
|||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.treeOpenSize.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.treeOpenItems.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.treeOpenCounter.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
|
||||
func (b *blobovnicza) BlobovniczaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
|
||||
b.treeReqDuration.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
|
@ -108,14 +120,14 @@ func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, metho
|
|||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (b *blobovnicza) AddBlobobvnizcaTreePut(shardID, path string, size int) {
|
||||
func (b *blobovnicza) AddBlobovniczaTreePut(shardID, path string, size int) {
|
||||
b.treePut.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (b *blobovnicza) AddBlobobvnizcaTreeGet(shardID, path string, size int) {
|
||||
func (b *blobovnicza) AddBlobovniczaTreeGet(shardID, path string, size int) {
|
||||
b.treeGet.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
|
|
120
pkg/metrics/blobtree.go
Normal file
120
pkg/metrics/blobtree.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type BlobTreeMetrics interface {
|
||||
SetBlobTreeMode(shardID, path string, readOnly bool)
|
||||
CloseBlobTree(shardID, path string)
|
||||
BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool)
|
||||
IncBlobTreeFilesCount(shardID, path string)
|
||||
DecBlobTreeFilesCount(shardID, path string)
|
||||
AddBlobTreePut(shardID, path string, size int)
|
||||
AddBlobTreeGet(shardID, path string, size int)
|
||||
}
|
||||
|
||||
type blobTreeMetrics struct {
|
||||
mode *shardIDPathModeValue
|
||||
reqDuration *prometheus.HistogramVec
|
||||
put *prometheus.CounterVec
|
||||
get *prometheus.CounterVec
|
||||
filesCount *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newBlobTreeMetrics() *blobTreeMetrics {
|
||||
return &blobTreeMetrics{
|
||||
mode: newShardIDPathMode(blobTreeSubSystem, "mode", "Blob tree mode"),
|
||||
|
||||
reqDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobTreeSubSystem,
|
||||
Name: "request_duration_seconds",
|
||||
Help: "Accumulated Blob tree request process duration",
|
||||
}, []string{shardIDLabel, pathLabel, successLabel, methodLabel, withStorageIDLabel}),
|
||||
put: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobTreeSubSystem,
|
||||
Name: "put_bytes",
|
||||
Help: "Accumulated payload size written to Blob tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
get: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobTreeSubSystem,
|
||||
Name: "get_bytes",
|
||||
Help: "Accumulated payload size read from Blob tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
filesCount: metrics.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: blobTreeSubSystem,
|
||||
Name: "files_count",
|
||||
Help: "Count of data files in Blob tree",
|
||||
}, []string{shardIDLabel, pathLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) SetBlobTreeMode(shardID, path string, readOnly bool) {
|
||||
b.mode.SetMode(shardID, path, modeFromBool(readOnly))
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) CloseBlobTree(shardID, path string) {
|
||||
b.mode.SetMode(shardID, path, closedMode)
|
||||
b.reqDuration.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.get.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.put.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
b.filesCount.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
})
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) BlobTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
|
||||
b.reqDuration.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
methodLabel: method,
|
||||
withStorageIDLabel: withStorageID.String(),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) IncBlobTreeFilesCount(shardID, path string) {
|
||||
b.filesCount.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) DecBlobTreeFilesCount(shardID, path string) {
|
||||
b.filesCount.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Dec()
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) AddBlobTreePut(shardID, path string, size int) {
|
||||
b.put.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
||||
|
||||
func (b *blobTreeMetrics) AddBlobTreeGet(shardID, path string, size int) {
|
||||
b.get.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Add(float64(size))
|
||||
}
|
|
@ -7,6 +7,7 @@ const (
|
|||
fstreeSubSystem = "fstree"
|
||||
blobstoreSubSystem = "blobstore"
|
||||
blobovniczaTreeSubSystem = "blobovnicza_tree"
|
||||
blobTreeSubSystem = "blobtree"
|
||||
metabaseSubSystem = "metabase"
|
||||
piloramaSubSystem = "pilorama"
|
||||
engineSubsystem = "engine"
|
||||
|
@ -21,6 +22,7 @@ const (
|
|||
writeCacheSubsystem = "writecache"
|
||||
grpcServerSubsystem = "grpc_server"
|
||||
policerSubsystem = "policer"
|
||||
badgerStoreSubSystem = "badgerstore"
|
||||
|
||||
successLabel = "success"
|
||||
shardIDLabel = "shard_id"
|
||||
|
|
|
@ -16,10 +16,12 @@ type NodeMetrics struct {
|
|||
epoch prometheus.Gauge
|
||||
fstree *fstreeMetrics
|
||||
blobstore *blobstoreMetrics
|
||||
blobobvnizca *blobovnicza
|
||||
blobovnicza *blobovnicza
|
||||
metabase *metabaseMetrics
|
||||
pilorama *piloramaMetrics
|
||||
grpc *grpcServerMetrics
|
||||
blobTree *blobTreeMetrics
|
||||
badgerStore *badgerStoreMetrics
|
||||
policer *policerMetrics
|
||||
morphClient *morphClientMetrics
|
||||
morphCache *morphCacheMetrics
|
||||
|
@ -39,16 +41,18 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
Name: "epoch",
|
||||
Help: "Current epoch as seen by inner-ring node.",
|
||||
}),
|
||||
fstree: newFSTreeMetrics(),
|
||||
blobstore: newBlobstoreMetrics(),
|
||||
blobobvnizca: newBlobovnicza(),
|
||||
metabase: newMetabaseMetrics(),
|
||||
pilorama: newPiloramaMetrics(),
|
||||
grpc: newGrpcServerMetrics(),
|
||||
policer: newPolicerMetrics(),
|
||||
morphClient: newMorphClientMetrics(),
|
||||
morphCache: newMorphCacheMetrics(namespace),
|
||||
log: logger.NewLogMetrics(namespace),
|
||||
fstree: newFSTreeMetrics(),
|
||||
blobstore: newBlobstoreMetrics(),
|
||||
blobovnicza: newBlobovnicza(),
|
||||
metabase: newMetabaseMetrics(),
|
||||
pilorama: newPiloramaMetrics(),
|
||||
grpc: newGrpcServerMetrics(),
|
||||
policer: newPolicerMetrics(),
|
||||
morphClient: newMorphClientMetrics(),
|
||||
morphCache: newMorphCacheMetrics(namespace),
|
||||
log: logger.NewLogMetrics(namespace),
|
||||
blobTree: newBlobTreeMetrics(),
|
||||
badgerStore: newbadgerStoreMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,8 +89,8 @@ func (m *NodeMetrics) Blobstore() BlobstoreMetrics {
|
|||
return m.blobstore
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) BlobobvnizcaTreeMetrics() BlobobvnizcaMetrics {
|
||||
return m.blobobvnizca
|
||||
func (m *NodeMetrics) BlobovniczaTreeMetrics() BlobovniczaMetrics {
|
||||
return m.blobovnicza
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) MetabaseMetrics() MetabaseMetrics {
|
||||
|
@ -116,3 +120,11 @@ func (m *NodeMetrics) MorphCacheMetrics() MorphCacheMetrics {
|
|||
func (m *NodeMetrics) LogMetrics() logger.LogMetrics {
|
||||
return m.log
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) BlobTreeMetrics() BlobTreeMetrics {
|
||||
return m.blobTree
|
||||
}
|
||||
|
||||
func (m *NodeMetrics) BadgerStoreMetrics() BadgerStoreMetrics {
|
||||
return m.badgerStore
|
||||
}
|
||||
|
|
|
@ -3,8 +3,8 @@ package sync
|
|||
import "sync"
|
||||
|
||||
type locker struct {
|
||||
mtx sync.Mutex
|
||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||
mtx sync.RWMutex
|
||||
userCount int // not protected by mtx, must used outer mutex to update concurrently
|
||||
}
|
||||
|
||||
type KeyLocker[K comparable] struct {
|
||||
|
@ -19,26 +19,50 @@ func NewKeyLocker[K comparable]() *KeyLocker[K] {
|
|||
}
|
||||
|
||||
func (l *KeyLocker[K]) Lock(key K) {
|
||||
l.lock(key, false)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) RLock(key K) {
|
||||
l.lock(key, true)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) lock(key K, read bool) {
|
||||
l.lockersMtx.Lock()
|
||||
|
||||
if locker, found := l.lockers[key]; found {
|
||||
locker.waiters++
|
||||
locker.userCount++
|
||||
l.lockersMtx.Unlock()
|
||||
|
||||
locker.mtx.Lock()
|
||||
if read {
|
||||
locker.mtx.RLock()
|
||||
} else {
|
||||
locker.mtx.Lock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
locker := &locker{
|
||||
waiters: 1,
|
||||
userCount: 1,
|
||||
}
|
||||
if read {
|
||||
locker.mtx.RLock()
|
||||
} else {
|
||||
locker.mtx.Lock()
|
||||
}
|
||||
locker.mtx.Lock()
|
||||
|
||||
l.lockers[key] = locker
|
||||
l.lockersMtx.Unlock()
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) Unlock(key K) {
|
||||
l.unlock(key, false)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) RUnlock(key K) {
|
||||
l.unlock(key, true)
|
||||
}
|
||||
|
||||
func (l *KeyLocker[K]) unlock(key K, read bool) {
|
||||
l.lockersMtx.Lock()
|
||||
defer l.lockersMtx.Unlock()
|
||||
|
||||
|
@ -47,10 +71,14 @@ func (l *KeyLocker[K]) Unlock(key K) {
|
|||
return
|
||||
}
|
||||
|
||||
if locker.waiters == 1 {
|
||||
if locker.userCount == 1 {
|
||||
delete(l.lockers, key)
|
||||
}
|
||||
locker.waiters--
|
||||
locker.userCount--
|
||||
|
||||
locker.mtx.Unlock()
|
||||
if read {
|
||||
locker.mtx.RUnlock()
|
||||
} else {
|
||||
locker.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestKeyLocker(t *testing.T) {
|
||||
func TestKeyLockerWrite(t *testing.T) {
|
||||
taken := false
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := NewKeyLocker[int]()
|
||||
|
@ -30,3 +30,17 @@ func TestKeyLocker(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestKeyLockerRead(t *testing.T) {
|
||||
eg, _ := errgroup.WithContext(context.Background())
|
||||
keyLocker := NewKeyLocker[int]()
|
||||
for i := 0; i < 100; i++ {
|
||||
eg.Go(func() error {
|
||||
keyLocker.RLock(0)
|
||||
defer keyLocker.RUnlock(0)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
My suggestion is not the big deal but what do you think about using
filepath.WalkDir
?