diff --git a/pkg/local_object_storage/blobovnicza/blobovnicza.go b/pkg/local_object_storage/blobovnicza/blobovnicza.go index 21c9f9e5..ecd1dc5e 100644 --- a/pkg/local_object_storage/blobovnicza/blobovnicza.go +++ b/pkg/local_object_storage/blobovnicza/blobovnicza.go @@ -31,6 +31,8 @@ type cfg struct { objSizeLimit uint64 log *logger.Logger + + metrics Metrics } type boltDBCfg struct { @@ -52,6 +54,7 @@ func defaultCfg(c *cfg) { fullSizeLimit: 1 << 30, // 1GB objSizeLimit: 1 << 20, // 1MB log: &logger.Logger{Logger: zap.L()}, + metrics: &noopMetrics{}, } } @@ -112,3 +115,10 @@ func WithReadOnly(ro bool) Option { c.boltOptions.ReadOnly = ro } } + +// WithMetrics returns an option to set metrics storage. +func WithMetrics(m Metrics) Option { + return func(c *cfg) { + c.metrics = m + } +} diff --git a/pkg/local_object_storage/blobovnicza/control.go b/pkg/local_object_storage/blobovnicza/control.go index 84274528..c776afe0 100644 --- a/pkg/local_object_storage/blobovnicza/control.go +++ b/pkg/local_object_storage/blobovnicza/control.go @@ -35,6 +35,9 @@ func (b *Blobovnicza) Open() error { ) b.boltDB, err = bbolt.Open(b.path, b.perm, b.boltOptions) + if err == nil { + b.metrics.IncOpenCount() + } return err } @@ -81,7 +84,9 @@ func (b *Blobovnicza) Init() error { return fmt.Errorf("can't determine DB size: %w", err) } - b.filled.Store(uint64(info.Size())) + sz := uint64(info.Size()) + b.filled.Store(sz) + b.metrics.IncSize(sz) return err } @@ -91,5 +96,10 @@ func (b *Blobovnicza) Close() error { zap.String("path", b.path), ) - return b.boltDB.Close() + err := b.boltDB.Close() + if err == nil { + b.metrics.DecOpenCount() + b.metrics.DecSize(b.filled.Load()) + } + return err } diff --git a/pkg/local_object_storage/blobovnicza/metrics.go b/pkg/local_object_storage/blobovnicza/metrics.go new file mode 100644 index 00000000..1ffb7b1e --- /dev/null +++ b/pkg/local_object_storage/blobovnicza/metrics.go @@ -0,0 +1,16 @@ +package blobovnicza + +type Metrics interface { + IncOpenCount() + DecOpenCount() + + IncSize(size uint64) + DecSize(size uint64) +} + +type noopMetrics struct{} + +func (m *noopMetrics) IncOpenCount() {} +func (m *noopMetrics) DecOpenCount() {} +func (m *noopMetrics) IncSize(uint64) {} +func (m *noopMetrics) DecSize(uint64) {} diff --git a/pkg/local_object_storage/blobovnicza/sizes.go b/pkg/local_object_storage/blobovnicza/sizes.go index 1cc100d1..bdbc77d1 100644 --- a/pkg/local_object_storage/blobovnicza/sizes.go +++ b/pkg/local_object_storage/blobovnicza/sizes.go @@ -41,10 +41,12 @@ func upperPowerOfTwo(v uint64) uint64 { func (b *Blobovnicza) incSize(sz uint64) { b.filled.Add(sz) + b.metrics.IncSize(sz) } func (b *Blobovnicza) decSize(sz uint64) { b.filled.Add(^(sz - 1)) + b.metrics.DecSize(sz) } func (b *Blobovnicza) full() bool { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index e80d7a15..f362e2a0 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -102,15 +102,35 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { return nil, fmt.Errorf("could not generate shard ID: %w", err) } + opts = e.appendMetrics(id, opts) + + sh := shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), + shard.WithExpiredLocksCallback(e.processExpiredLocks), + shard.WithDeletedLockCallback(e.processDeletedLocks), + shard.WithReportErrorFunc(e.reportShardErrorBackground), + )...) + + if err := sh.UpdateID(); err != nil { + return nil, fmt.Errorf("could not update shard ID: %w", err) + } + + return sh, err +} + +func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard.Option { e.mtx.RLock() + defer e.mtx.RUnlock() if e.metrics != nil { - opts = append(opts, shard.WithMetricsWriter( - &metricsWithID{ - id: id.String(), - mw: e.metrics, - }, - ), + opts = append(opts, + shard.WithMetricsWriter( + &metricsWithID{ + id: id.String(), + mw: e.metrics, + }, + ), shard.WithExtraWriteCacheOptions(writecache.WithMetrics( &writeCacheMetrics{ shardID: id.String(), @@ -126,21 +146,7 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { ) } - e.mtx.RUnlock() - - sh := shard.New(append(opts, - shard.WithID(id), - shard.WithExpiredTombstonesCallback(e.processExpiredTombstones), - shard.WithExpiredLocksCallback(e.processExpiredLocks), - shard.WithDeletedLockCallback(e.processDeletedLocks), - shard.WithReportErrorFunc(e.reportShardErrorBackground), - )...) - - if err := sh.UpdateID(); err != nil { - return nil, fmt.Errorf("could not update shard ID: %w", err) - } - - return sh, err + return opts } func (e *StorageEngine) addShard(sh *shard.Shard) error {