Speed up metabase refill #1027

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:feat/resync_speedup into master 2024-09-04 19:51:07 +00:00
23 changed files with 366 additions and 47 deletions

View file

@ -579,6 +579,8 @@ const (
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
BlobstoreFailedToGetFileinfo = "failed to get file info"
ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part"
)

View file

@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error {
return b.initializeCounters()
}
func (b *Blobovnicza) ObjectsCount() uint64 {
return b.itemsCount.Load()
}
func (b *Blobovnicza) initializeCounters() error {
var size uint64
var items uint64

View file

@ -0,0 +1,38 @@
package blobovniczatree
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
)
func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) {
var (
success bool
startedAt = time.Now()
)
defer func() {
b.metrics.ObjectsCount(time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount")
defer span.End()
var result uint64
err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shDB := b.getBlobovniczaWithoutCaching(p)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
result += blz.ObjectsCount()
return false, nil
})
if err != nil {
return 0, err
}
return result, nil
}

View file

@ -24,6 +24,7 @@ type Metrics interface {
SetRebuildStatus(status string)
ObjectMoved(d time.Duration)
SetRebuildPercent(value uint32)
ObjectsCount(d time.Duration, success bool)
Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool)
@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
return &blobovnicza.NoopMetrics{}
}

View file

@ -15,6 +15,7 @@ type Storage interface {
Type() string
Path() string
ObjectsCount(ctx context.Context) (uint64, error)
SetCompressor(cc *compression.Config)
Compressor() *compression.Config

View file

@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) {
return counter, nil
}
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.ObjectsCount(time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount",
trace.WithAttributes(
attribute.String("path", t.RootPath),
))
defer span.End()
var result uint64
err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if !d.IsDir() {
result++
}
return nil
},
)
if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
}
success = true
return result, nil
}
// Type is fstree storage type used in logs and configuration.
const Type = "fstree"

View file

@ -14,6 +14,7 @@ type Metrics interface {
Put(d time.Duration, size int, success bool)
Get(d time.Duration, size int, success bool)
GetRange(d time.Duration, size int, success bool)
ObjectsCount(d time.Duration, success bool)
}
type noopMetrics struct{}
@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}

View file

@ -1,5 +1,14 @@
package blobstor
import (
"context"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"golang.org/x/sync/errgroup"
)
// DumpInfo returns information about blob stor.
func (b *BlobStor) DumpInfo() Info {
b.modeMtx.RLock()
@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info {
SubStorages: sub,
}
}
// ObjectsCount returns Blobstore's total objects count.
func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) {
var err error
startedAt := time.Now()
defer func() {
b.metrics.ObjectsCount(time.Since(startedAt), err == nil)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount")
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
var result atomic.Uint64
eg, egCtx := errgroup.WithContext(ctx)
for i := range b.storage {
i := i
eg.Go(func() error {
v, e := b.storage[i].Storage.ObjectsCount(egCtx)
if e != nil {
return e
}
result.Add(v)

This won't work for blobovniczas:

  1. They don't reclaim memory and the whole 40mb db could be empty.
  2. B-tree has some overhead for meta.

This becomes worse the more small objects we have.

Have you considered using naive but correct implementation? (traverse blobovniczas, maybe even count objects, not their size).

This won't work for blobovniczas: 1. They don't reclaim memory and the whole 40mb db could be empty. 2. B-tree has some overhead for meta. This becomes worse the more small objects we have. Have you considered using naive but correct implementation? (traverse blobovniczas, maybe even count objects, not their size).

That's why the method is called DiskUsage, not DataSize.
Every blobovnicza stores data size, but to get this value it is required to open/close db. As it is used only for metrics, I think it is enough to calculate only disk size.

That's why the method is called `DiskUsage`, not `DataSize`. Every blobovnicza stores data size, but to get this value it is required to open/close db. As it is used only for metrics, I think it is enough to calculate only disk size.

But then metrics won't reflect what we deem them reflect (progress of operation).

But then metrics won't reflect what we deem them reflect (progress of operation).

Agree, fixed.
Now object count used for metrics, data size is hard to compute if compression is enabled.

Agree, fixed. Now object count used for metrics, data size is hard to compute if compression is enabled.
return nil
})
}
if err = eg.Wait(); err != nil {
return 0, err
}
return result.Load(), nil
}

View file

@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}
func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return uint64(len(s.objs)), nil
}

View file

@ -13,6 +13,7 @@ type Metrics interface {
Get(d time.Duration, size int, success, withStorageID bool)
Iterate(d time.Duration, success bool)
Put(d time.Duration, size int, success bool)
ObjectsCount(d time.Duration, success bool)
}
type noopMetrics struct{}
@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}

View file

@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {}
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}
func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.st.ObjectsCount(ctx)
}

View file

@ -27,6 +27,10 @@ type MetricRegister interface {
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics
}

View file

@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
}
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
m.mw.IncRefillObjectsCount(m.id, path, size, success)
}
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
m.mw.SetRefillPercent(m.id, path, percent)
}
func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status)
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.

View file

@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
}
}
func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{})
}
type blobovniczaMetrics struct {
m metrics_impl.BlobobvnizcaMetrics
shardID func() string

View file

@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) {
m.m.AddPut(m.shardID, size)
}
}
func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{})
}

View file

@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) {
m.m.AddGet(m.shardID, m.path, size)
}
}
func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -181,47 +182,54 @@ func (s *Shard) Init(ctx context.Context) error {
}
func (s *Shard) refillMetabase(ctx context.Context) error {
path := s.metaBase.DumpInfo().Path
s.metricsWriter.SetRefillStatus(path, "running")
s.metricsWriter.SetRefillPercent(path, 0)
var success bool
defer func() {
if success {
s.metricsWriter.SetRefillStatus(path, "completed")
} else {
s.metricsWriter.SetRefillStatus(path, "failed")
}
}()
err := s.metaBase.Reset()
if err != nil {
return fmt.Errorf("could not reset metabase: %w", err)
}
withCount := true
totalObjects, err := s.blobStor.ObjectsCount(ctx)
if err != nil {

Why not to move timeout in config?

Why not to move timeout in config?

can't imagine the situation when we will change this.

can't imagine the situation when we will change this.
fyrchik marked this conversation as resolved Outdated

It looks strange to me: we can wait for 10 minutes and still do not receive any metrics.
Ironically situations when disk usage calculation takes too long are also likely the ones where having metrics provides a huge QOL improvement. metabase resync duration has the order of tens of hours for big disks, 10 minute is nothing in comparison.

So why don't we allow ourselves hang here?

It looks strange to me: we can wait for 10 minutes and still do not receive any metrics. Ironically situations when disk usage calculation takes too long are also likely the ones where having metrics provides a huge QOL improvement. metabase resync duration has the order of _tens of hours_ for big disks, 10 minute is nothing in comparison. So why don't we allow ourselves hang here?

Fixed.

Fixed.
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
withCount = false
}
eg, egCtx := errgroup.WithContext(ctx)
if s.cfg.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
}
var completedCount uint64
var metricGuard sync.Mutex
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
eg.Go(func() error {
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr),
zap.String("err", err.Error()))
return nil
}
var success bool
defer func() {
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
if withCount {
metricGuard.Lock()
completedCount++
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
metricGuard.Unlock()
}
}()
var err error
switch obj.Type() {
case objectSDK.TypeTombstone:
err = s.refillTombstoneObject(egCtx, obj)
case objectSDK.TypeLock:
err = s.refillLockObject(egCtx, obj)
default:
}
if err != nil {
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
return err
}
var mPrm meta.PutPrm
mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor)
_, err = s.metaBase.Put(egCtx, mPrm)
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err
}
success = true
return nil
})
@ -245,6 +253,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
return fmt.Errorf("could not sync object counters: %w", err)
}
success = true
s.metricsWriter.SetRefillPercent(path, 100)
return nil
}
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr),
zap.String("err", err.Error()))
return nil
}
var err error
switch obj.Type() {
case objectSDK.TypeTombstone:
err = s.refillTombstoneObject(ctx, obj)
case objectSDK.TypeLock:
err = s.refillLockObject(ctx, obj)
default:
}
if err != nil {
return err
}
var mPrm meta.PutPrm
mPrm.SetObject(obj)
mPrm.SetStorageID(descriptor)
_, err = s.metaBase.Put(ctx, mPrm)
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err
}
return nil
}

View file

@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
}),
}
mm := NewMetricStore()
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
WithMetricsWriter(mm),
)
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
WithRefillMetabase(true))
WithRefillMetabase(true),
WithMetricsWriter(mm))
require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background()))
@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) {
}),
}
mm := NewMetricStore()
sh := New(
WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...),
@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) {
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))),
WithMetricsWriter(mm),
)
// open Blobstor
@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) {
),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
WithMetricsWriter(mm),
)
// open Blobstor
@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) {
checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true)
checkLocked(t, cnrLocked, locked)
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
require.Equal(t, "completed", mm.refillStatus)
require.Equal(t, uint32(100), mm.refillPercent)
}

View file

@ -21,13 +21,28 @@ import (
)
type metricsStore struct {
mtx sync.Mutex
objCounters map[string]uint64
cnrSize map[string]int64
cnrCount map[string]uint64
pldSize int64
mode mode.Mode
errCounter int64
mtx sync.Mutex
objCounters map[string]uint64
cnrSize map[string]int64
cnrCount map[string]uint64
pldSize int64
mode mode.Mode
errCounter int64
refillCount int64
refillSize int64
refillPercent uint32
refillStatus string
}
func NewMetricStore() *metricsStore {
return &metricsStore{
objCounters: map[string]uint64{
"phy": 0,
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
}
}
func (m *metricsStore) SetShardID(_ string) {}
@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool
return v, ok
}
func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillCount++
m.refillSize += int64(size)
}
func (m *metricsStore) SetRefillPercent(_ string, percent uint32) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillPercent = percent
}
func (m *metricsStore) SetRefillStatus(_ string, status string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillStatus = status
}
func TestCounters(t *testing.T) {
t.Parallel()
@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
}),
}
mm := &metricsStore{
objCounters: map[string]uint64{
"phy": 0,
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
}
mm := NewMetricStore()
sh := New(
WithID(NewIDFromBytes([]byte{})),

View file

@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) {
WithMetaBaseOptions(metaOpts...),
WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))),
WithMetricsWriter(NewMetricStore()),
}
sh := New(opts...)

View file

@ -90,6 +90,12 @@ type MetricsWriter interface {
IncContainerObjectsCount(cnrID string, objectType string)
// SubContainerObjectsCount subtracts container object count.
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
// IncRefillObjectsCount increments refill objects count.
IncRefillObjectsCount(path string, size int, success bool)
// SetRefillPercent sets refill percent.
SetRefillPercent(path string, percent uint32)
// SetRefillStatus sets refill status.
SetRefillStatus(path string, status string)
}
type cfg struct {

View file

@ -1,6 +1,7 @@
package metrics
import (
"strconv"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -23,6 +24,9 @@ type EngineMetrics interface {
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
WriteCache() WriteCacheMetrics
GC() GCMetrics
@ -37,6 +41,11 @@ type engineMetrics struct {
mode *shardIDModeValue
contObjCounter *prometheus.GaugeVec
refillStatus *shardIDPathModeValue
refillObjCounter *prometheus.GaugeVec
refillPayloadCounter *prometheus.GaugeVec
refillPercentCounter *prometheus.GaugeVec
gc *gcMetrics
writeCache *writeCacheMetrics
}
@ -55,10 +64,14 @@ func newEngineMetrics() *engineMetrics {
objectCounter: newEngineGaugeVector("objects_total",
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
[]string{shardIDLabel, typeLabel}),
gc: newGCMetrics(),
writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
gc: newGCMetrics(),
writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"),
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
}
}
@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID)
m.refillStatus.DeleteByShardID(shardID)
}
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics {
func (m *engineMetrics) GC() GCMetrics {
return m.gc
}
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
m.refillObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
},
).Inc()
m.refillPayloadCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
},
).Add(float64(size))
}
func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
m.refillPercentCounter.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Set(float64(percent))
}
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
m.refillStatus.SetMode(shardID, path, status)
}

View file

@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) {
})
}
func (m *shardIDPathModeValue) DeleteByShardID(shardID string) {
m.modeValue.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
})
}
func modeFromBool(readOnly bool) string {
modeValue := readWriteMode
if readOnly {