Speed up metabase refill #1027
23 changed files with 366 additions and 47 deletions
|
@ -579,6 +579,8 @@ const (
|
|||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||
BlobstoreFailedToGetFileinfo = "failed to get file info"
|
||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||
ECFailedToSaveECPart = "failed to save EC part"
|
||||
)
|
||||
|
|
|
@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error {
|
|||
return b.initializeCounters()
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) ObjectsCount() uint64 {
|
||||
return b.itemsCount.Load()
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) initializeCounters() error {
|
||||
var size uint64
|
||||
var items uint64
|
||||
|
|
38
pkg/local_object_storage/blobstor/blobovniczatree/count.go
Normal file
38
pkg/local_object_storage/blobstor/blobovniczatree/count.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
)
|
||||
|
||||
func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var (
|
||||
success bool
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
var result uint64
|
||||
err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
|
||||
shDB := b.getBlobovniczaWithoutCaching(p)
|
||||
blz, err := shDB.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
defer shDB.Close()
|
||||
|
||||
result += blz.ObjectsCount()
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
|
@ -24,6 +24,7 @@ type Metrics interface {
|
|||
SetRebuildStatus(status string)
|
||||
ObjectMoved(d time.Duration)
|
||||
SetRebuildPercent(value uint32)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
|
||||
Delete(d time.Duration, success, withStorageID bool)
|
||||
Exists(d time.Duration, success, withStorageID bool)
|
||||
|
@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
|||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
|
||||
return &blobovnicza.NoopMetrics{}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ type Storage interface {
|
|||
|
||||
Type() string
|
||||
Path() string
|
||||
ObjectsCount(ctx context.Context) (uint64, error)
|
||||
|
||||
SetCompressor(cc *compression.Config)
|
||||
Compressor() *compression.Config
|
||||
|
|
|
@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) {
|
|||
return counter, nil
|
||||
}
|
||||
|
||||
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
t.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", t.RootPath),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var result uint64
|
||||
|
||||
err := filepath.WalkDir(t.RootPath,
|
||||
func(_ string, d fs.DirEntry, _ error) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if !d.IsDir() {
|
||||
result++
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
}
|
||||
success = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Type is fstree storage type used in logs and configuration.
|
||||
const Type = "fstree"
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ type Metrics interface {
|
|||
Put(d time.Duration, size int, success bool)
|
||||
Get(d time.Duration, size int, success bool)
|
||||
GetRange(d time.Duration, size int, success bool)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {}
|
|||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) Get(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
|
|
|
@ -1,5 +1,14 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// DumpInfo returns information about blob stor.
|
||||
func (b *BlobStor) DumpInfo() Info {
|
||||
b.modeMtx.RLock()
|
||||
|
@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info {
|
|||
SubStorages: sub,
|
||||
}
|
||||
}
|
||||
|
||||
// ObjectsCount returns Blobstore's total objects count.
|
||||
func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var err error
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
b.metrics.ObjectsCount(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
var result atomic.Uint64
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for i := range b.storage {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
v, e := b.storage[i].Storage.ObjectsCount(egCtx)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
result.Add(v)
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err = eg.Wait(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return result.Load(), nil
|
||||
}
|
||||
|
|
|
@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
|||
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
}
|
||||
|
||||
func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return uint64(len(s.objs)), nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type Metrics interface {
|
|||
Get(d time.Duration, size int, success, withStorageID bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
|||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
|
|
|
@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {}
|
|||
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
}
|
||||
|
||||
func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.st.ObjectsCount(ctx)
|
||||
}
|
||||
|
|
|
@ -27,6 +27,10 @@ type MetricRegister interface {
|
|||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
|
||||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
GC() metrics.GCMetrics
|
||||
}
|
||||
|
|
|
@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string
|
|||
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
|
||||
m.mw.IncRefillObjectsCount(m.id, path, size, success)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
|
||||
m.mw.SetRefillPercent(m.id, path, percent)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetRefillStatus(path string, status string) {
|
||||
m.mw.SetRefillStatus(m.id, path, status)
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
||||
type blobovniczaMetrics struct {
|
||||
m metrics_impl.BlobobvnizcaMetrics
|
||||
shardID func() string
|
||||
|
|
|
@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) {
|
|||
m.m.AddPut(m.shardID, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
|
|
@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) {
|
|||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -181,47 +182,54 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||
path := s.metaBase.DumpInfo().Path
|
||||
s.metricsWriter.SetRefillStatus(path, "running")
|
||||
s.metricsWriter.SetRefillPercent(path, 0)
|
||||
var success bool
|
||||
defer func() {
|
||||
if success {
|
||||
s.metricsWriter.SetRefillStatus(path, "completed")
|
||||
} else {
|
||||
s.metricsWriter.SetRefillStatus(path, "failed")
|
||||
}
|
||||
}()
|
||||
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not reset metabase: %w", err)
|
||||
}
|
||||
|
||||
withCount := true
|
||||
totalObjects, err := s.blobStor.ObjectsCount(ctx)
|
||||
if err != nil {
|
||||
acid-ant
commented
Why not to move timeout in config? Why not to move timeout in config?
dstepanov-yadro
commented
can't imagine the situation when we will change this. can't imagine the situation when we will change this.
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It looks strange to me: we can wait for 10 minutes and still do not receive any metrics. So why don't we allow ourselves hang here? It looks strange to me: we can wait for 10 minutes and still do not receive any metrics.
Ironically situations when disk usage calculation takes too long are also likely the ones where having metrics provides a huge QOL improvement. metabase resync duration has the order of _tens of hours_ for big disks, 10 minute is nothing in comparison.
So why don't we allow ourselves hang here?
dstepanov-yadro
commented
Fixed. Fixed.
|
||||
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
|
||||
withCount = false
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
if s.cfg.refillMetabaseWorkersCount > 0 {
|
||||
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
|
||||
}
|
||||
|
||||
var completedCount uint64
|
||||
var metricGuard sync.Mutex
|
||||
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
eg.Go(func() error {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("err", err.Error()))
|
||||
return nil
|
||||
var success bool
|
||||
defer func() {
|
||||
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
|
||||
if withCount {
|
||||
metricGuard.Lock()
|
||||
completedCount++
|
||||
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
|
||||
metricGuard.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err = s.refillTombstoneObject(egCtx, obj)
|
||||
case objectSDK.TypeLock:
|
||||
err = s.refillLockObject(egCtx, obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err = s.metaBase.Put(egCtx, mPrm)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
|
||||
success = true
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -245,6 +253,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
return fmt.Errorf("could not sync object counters: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
s.metricsWriter.SetRefillPercent(path, 100)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("err", err.Error()))
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err = s.refillTombstoneObject(ctx, obj)
|
||||
case objectSDK.TypeLock:
|
||||
err = s.refillLockObject(ctx, obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err = s.metaBase.Put(ctx, mPrm)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
|
@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||
WithRefillMetabase(true))
|
||||
WithRefillMetabase(true),
|
||||
WithMetricsWriter(mm))
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
|
@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
|
@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
|
@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
|
@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
checkObj(object.AddressOf(tombObj), tombObj)
|
||||
checkTombMembers(true)
|
||||
checkLocked(t, cnrLocked, locked)
|
||||
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
|
||||
require.Equal(t, "completed", mm.refillStatus)
|
||||
require.Equal(t, uint32(100), mm.refillPercent)
|
||||
}
|
||||
|
|
|
@ -28,6 +28,21 @@ type metricsStore struct {
|
|||
pldSize int64
|
||||
mode mode.Mode
|
||||
errCounter int64
|
||||
refillCount int64
|
||||
refillSize int64
|
||||
refillPercent uint32
|
||||
refillStatus string
|
||||
}
|
||||
|
||||
func NewMetricStore() *metricsStore {
|
||||
return &metricsStore{
|
||||
objCounters: map[string]uint64{
|
||||
"phy": 0,
|
||||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrCount: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetShardID(_ string) {}
|
||||
|
@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool
|
|||
return v, ok
|
||||
}
|
||||
|
||||
func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillCount++
|
||||
m.refillSize += int64(size)
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetRefillPercent(_ string, percent uint32) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillPercent = percent
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetRefillStatus(_ string, status string) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillStatus = status
|
||||
}
|
||||
|
||||
func TestCounters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := &metricsStore{
|
||||
objCounters: map[string]uint64{
|
||||
"phy": 0,
|
||||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrCount: make(map[string]uint64),
|
||||
}
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
|
|
|
@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) {
|
|||
WithMetaBaseOptions(metaOpts...),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||
WithMetricsWriter(NewMetricStore()),
|
||||
}
|
||||
|
||||
sh := New(opts...)
|
||||
|
|
|
@ -90,6 +90,12 @@ type MetricsWriter interface {
|
|||
IncContainerObjectsCount(cnrID string, objectType string)
|
||||
// SubContainerObjectsCount subtracts container object count.
|
||||
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||
// IncRefillObjectsCount increments refill objects count.
|
||||
IncRefillObjectsCount(path string, size int, success bool)
|
||||
// SetRefillPercent sets refill percent.
|
||||
SetRefillPercent(path string, percent uint32)
|
||||
// SetRefillStatus sets refill status.
|
||||
SetRefillStatus(path string, status string)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -23,6 +24,9 @@ type EngineMetrics interface {
|
|||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
|
@ -37,6 +41,11 @@ type engineMetrics struct {
|
|||
mode *shardIDModeValue
|
||||
contObjCounter *prometheus.GaugeVec
|
||||
|
||||
refillStatus *shardIDPathModeValue
|
||||
refillObjCounter *prometheus.GaugeVec
|
||||
refillPayloadCounter *prometheus.GaugeVec
|
||||
refillPercentCounter *prometheus.GaugeVec
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
}
|
||||
|
@ -59,6 +68,10 @@ func newEngineMetrics() *engineMetrics {
|
|||
writeCache: newWriteCacheMetrics(),
|
||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||
refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"),
|
||||
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
|||
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.mode.Delete(shardID)
|
||||
m.refillStatus.DeleteByShardID(shardID)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
|
@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics {
|
|||
func (m *engineMetrics) GC() GCMetrics {
|
||||
return m.gc
|
||||
}
|
||||
|
||||
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
|
||||
m.refillObjCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
},
|
||||
).Inc()
|
||||
m.refillPayloadCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
},
|
||||
).Add(float64(size))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
|
||||
m.refillPercentCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Set(float64(percent))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
|
||||
m.refillStatus.SetMode(shardID, path, status)
|
||||
}
|
||||
|
|
|
@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) {
|
|||
})
|
||||
}
|
||||
|
||||
func (m *shardIDPathModeValue) DeleteByShardID(shardID string) {
|
||||
m.modeValue.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
})
|
||||
}
|
||||
|
||||
func modeFromBool(readOnly bool) string {
|
||||
modeValue := readWriteMode
|
||||
if readOnly {
|
||||
|
|
Loading…
Reference in a new issue
This won't work for blobovniczas:
This becomes worse the more small objects we have.
Have you considered using naive but correct implementation? (traverse blobovniczas, maybe even count objects, not their size).
That's why the method is called
DiskUsage
, notDataSize
.Every blobovnicza stores data size, but to get this value it is required to open/close db. As it is used only for metrics, I think it is enough to calculate only disk size.
But then metrics won't reflect what we deem them reflect (progress of operation).
Agree, fixed.
Now object count used for metrics, data size is hard to compute if compression is enabled.