forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
5 changed files with 67 additions and 23 deletions
|
@ -31,6 +31,8 @@ type cfg struct {
|
|||
objSizeLimit uint64
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
metrics Metrics
|
||||
}
|
||||
|
||||
type boltDBCfg struct {
|
||||
|
@ -52,6 +54,7 @@ func defaultCfg(c *cfg) {
|
|||
fullSizeLimit: 1 << 30, // 1GB
|
||||
objSizeLimit: 1 << 20, // 1MB
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
metrics: &noopMetrics{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,3 +115,10 @@ func WithReadOnly(ro bool) Option {
|
|||
c.boltOptions.ReadOnly = ro
|
||||
}
|
||||
}
|
||||
|
||||
// WithMetrics returns an option to set metrics storage.
|
||||
func WithMetrics(m Metrics) Option {
|
||||
return func(c *cfg) {
|
||||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,9 @@ func (b *Blobovnicza) Open() error {
|
|||
)
|
||||
|
||||
b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions)
|
||||
if err == nil {
|
||||
b.metrics.IncOpenCount()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
@ -81,7 +84,9 @@ func (b *Blobovnicza) Init() error {
|
|||
return fmt.Errorf("can't determine DB size: %w", err)
|
||||
}
|
||||
|
||||
b.filled.Store(uint64(info.Size()))
|
||||
sz := uint64(info.Size())
|
||||
b.filled.Store(sz)
|
||||
b.metrics.IncSize(sz)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -91,5 +96,10 @@ func (b *Blobovnicza) Close() error {
|
|||
zap.String("path", b.path),
|
||||
)
|
||||
|
||||
return b.boltDB.Close()
|
||||
err := b.boltDB.Close()
|
||||
if err == nil {
|
||||
b.metrics.DecOpenCount()
|
||||
b.metrics.DecSize(b.filled.Load())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
16
pkg/local_object_storage/blobovnicza/metrics.go
Normal file
16
pkg/local_object_storage/blobovnicza/metrics.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package blobovnicza
|
||||
|
||||
type Metrics interface {
|
||||
IncOpenCount()
|
||||
DecOpenCount()
|
||||
|
||||
IncSize(size uint64)
|
||||
DecSize(size uint64)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
||||
func (m *noopMetrics) IncOpenCount() {}
|
||||
func (m *noopMetrics) DecOpenCount() {}
|
||||
func (m *noopMetrics) IncSize(uint64) {}
|
||||
func (m *noopMetrics) DecSize(uint64) {}
|
|
@ -41,10 +41,12 @@ func upperPowerOfTwo(v uint64) uint64 {
|
|||
|
||||
func (b *Blobovnicza) incSize(sz uint64) {
|
||||
b.filled.Add(sz)
|
||||
b.metrics.IncSize(sz)
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) decSize(sz uint64) {
|
||||
b.filled.Add(^(sz - 1))
|
||||
b.metrics.DecSize(sz)
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) full() bool {
|
||||
|
|
|
@ -102,15 +102,35 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
|||
return nil, fmt.Errorf("could not generate shard ID: %w", err)
|
||||
}
|
||||
|
||||
opts = e.appendMetrics(id, opts)
|
||||
|
||||
sh := shard.New(append(opts,
|
||||
shard.WithID(id),
|
||||
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
|
||||
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||
}
|
||||
|
||||
return sh, err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
if e.metrics != nil {
|
||||
opts = append(opts, shard.WithMetricsWriter(
|
||||
&metricsWithID{
|
||||
id: id.String(),
|
||||
mw: e.metrics,
|
||||
},
|
||||
),
|
||||
opts = append(opts,
|
||||
shard.WithMetricsWriter(
|
||||
&metricsWithID{
|
||||
id: id.String(),
|
||||
mw: e.metrics,
|
||||
},
|
||||
),
|
||||
shard.WithExtraWriteCacheOptions(writecache.WithMetrics(
|
||||
&writeCacheMetrics{
|
||||
shardID: id.String(),
|
||||
|
@ -126,21 +146,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
|||
)
|
||||
}
|
||||
|
||||
e.mtx.RUnlock()
|
||||
|
||||
sh := shard.New(append(opts,
|
||||
shard.WithID(id),
|
||||
shard.WithExpiredTombstonesCallback(e.processExpiredTombstones),
|
||||
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(); err != nil {
|
||||
return nil, fmt.Errorf("could not update shard ID: %w", err)
|
||||
}
|
||||
|
||||
return sh, err
|
||||
return opts
|
||||
}
|
||||
|
||||
func (e *StorageEngine) addShard(sh *shard.Shard) error {
|
||||
|
|
Loading…
Reference in a new issue