diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 64e5f771..7a5bcf24 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -579,6 +579,8 @@ const ( EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" + EngineRefillFailedToGetDiskUsage = "failed to get blobstor disk usage, no resync percent estimation is available" + BlobstoreFailedToGetFileinfo = "failed to get file info" ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSaveECPart = "failed to save EC part" ) diff --git a/pkg/local_object_storage/blobstor/info.go b/pkg/local_object_storage/blobstor/info.go index 2fd62af8..32c4656d 100644 --- a/pkg/local_object_storage/blobstor/info.go +++ b/pkg/local_object_storage/blobstor/info.go @@ -1,5 +1,18 @@ package blobstor +import ( + "context" + "io/fs" + "path/filepath" + "sync/atomic" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + // DumpInfo returns information about blob stor. func (b *BlobStor) DumpInfo() Info { b.modeMtx.RLock() @@ -15,3 +28,52 @@ func (b *BlobStor) DumpInfo() Info { SubStorages: sub, } } + +// DiskUsage returns Blobstore's disk usage in bytes. +func (b *BlobStor) DiskUsage(ctx context.Context) (uint64, error) { + var err error + startedAt := time.Now() + defer func() { + b.metrics.DiskUsage(time.Since(startedAt), err == nil) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.DiskUsage") + defer span.End() + + b.modeMtx.RLock() + defer b.modeMtx.RUnlock() + + var result atomic.Uint64 + + eg, egCtx := errgroup.WithContext(ctx) + for i := range b.storage { + path := b.storage[i].Storage.Path() + eg.Go(func() error { + return filepath.WalkDir(path, func(path string, d fs.DirEntry, _ error) error { + select { + case <-egCtx.Done(): + return egCtx.Err() + default: + } + + if d.IsDir() { + return nil + } + + info, err := d.Info() + if err != nil { + b.log.Warn(logs.BlobstoreFailedToGetFileinfo, zap.String("path", path), zap.Error(err)) + return nil + } + result.Add(uint64(info.Size())) + return nil + }) + }) + } + + if err = eg.Wait(); err != nil { + return 0, err + } + + return result.Load(), nil +} diff --git a/pkg/local_object_storage/blobstor/metrics.go b/pkg/local_object_storage/blobstor/metrics.go index 4a7b4009..8900754c 100644 --- a/pkg/local_object_storage/blobstor/metrics.go +++ b/pkg/local_object_storage/blobstor/metrics.go @@ -13,6 +13,7 @@ type Metrics interface { Get(d time.Duration, size int, success, withStorageID bool) Iterate(d time.Duration, success bool) Put(d time.Duration, size int, success bool) + DiskUsage(d time.Duration, success bool) } type noopMetrics struct{} @@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {} +func (m *noopMetrics) DiskUsage(time.Duration, bool) {} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 91dfa876..6b99dff7 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -27,6 +27,10 @@ type MetricRegister interface { IncContainerObjectCounter(shardID, contID, objectType string) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncRefillObjectsCount(shardID, path string, size int, success bool) + SetRefillPercent(shardID, path string, percent uint32) + SetRefillStatus(shardID, path, status string) + WriteCache() metrics.WriteCacheMetrics GC() metrics.GCMetrics } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index ccf2e9da..54d3e7d6 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value) } +func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) { + m.mw.IncRefillObjectsCount(m.id, path, size, success) +} + +func (m *metricsWithID) SetRefillPercent(path string, percent uint32) { + m.mw.SetRefillPercent(m.id, path, percent) +} + +func (m *metricsWithID) SetRefillStatus(path string, status string) { + m.mw.SetRefillStatus(m.id, path, status) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/metrics/blobstore.go b/pkg/local_object_storage/metrics/blobstore.go index 48249e89..498668fb 100644 --- a/pkg/local_object_storage/metrics/blobstore.go +++ b/pkg/local_object_storage/metrics/blobstore.go @@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) { m.m.AddPut(m.shardID, size) } } + +func (m *blobstoreMetrics) DiskUsage(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, "DiskUsage", d, success, metrics_impl.NullBool{}) +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d080e291..93a49ff5 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -181,47 +182,51 @@ func (s *Shard) Init(ctx context.Context) error { } func (s *Shard) refillMetabase(ctx context.Context) error { + path := s.metaBase.DumpInfo().Path + s.metricsWriter.SetRefillStatus(path, "running") + s.metricsWriter.SetRefillPercent(path, 0) + var success bool + defer func() { + if success { + s.metricsWriter.SetRefillStatus(path, "completed") + } else { + s.metricsWriter.SetRefillStatus(path, "failed") + } + }() + err := s.metaBase.Reset() if err != nil { return fmt.Errorf("could not reset metabase: %w", err) } + withDiskUsage := true + totalDiskUsage, err := s.blobStor.DiskUsage(ctx) + if err != nil { + s.log.Warn(logs.EngineRefillFailedToGetDiskUsage, zap.Error(err)) + withDiskUsage = false + } + eg, egCtx := errgroup.WithContext(ctx) if s.cfg.refillMetabaseWorkersCount > 0 { eg.SetLimit(s.cfg.refillMetabaseWorkersCount) } + var totalCompletedSize atomic.Uint64 itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { eg.Go(func() error { - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - s.log.Warn(logs.ShardCouldNotUnmarshalObject, - zap.Stringer("address", addr), - zap.String("err", err.Error())) - return nil - } + var success bool + defer func() { + s.metricsWriter.IncRefillObjectsCount(path, len(data), success) + if withDiskUsage { + v := totalCompletedSize.Add(uint64(len(data))) + s.metricsWriter.SetRefillPercent(path, uint32(v*100/totalDiskUsage)) + } + }() - var err error - switch obj.Type() { - case objectSDK.TypeTombstone: - err = s.refillTombstoneObject(egCtx, obj) - case objectSDK.TypeLock: - err = s.refillLockObject(egCtx, obj) - default: - } - if err != nil { + if err := s.refillObject(egCtx, data, addr, descriptor); err != nil { return err } - - var mPrm meta.PutPrm - mPrm.SetObject(obj) - mPrm.SetStorageID(descriptor) - - _, err = s.metaBase.Put(egCtx, mPrm) - if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { - return err - } - + success = true return nil }) @@ -245,6 +250,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error { return fmt.Errorf("could not sync object counters: %w", err) } + success = true + s.metricsWriter.SetRefillPercent(path, 100) + return nil +} + +func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error { + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + s.log.Warn(logs.ShardCouldNotUnmarshalObject, + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil + } + + var err error + switch obj.Type() { + case objectSDK.TypeTombstone: + err = s.refillTombstoneObject(ctx, obj) + case objectSDK.TypeLock: + err = s.refillLockObject(ctx, obj) + default: + } + if err != nil { + return err + } + + var mPrm meta.PutPrm + mPrm.SetObject(obj) + mPrm.SetStorageID(descriptor) + + _, err = s.metaBase.Put(ctx, mPrm) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { + return err + } return nil } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index d08747e1..44fee163 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) { }), } + mm := NewMetricStore() + sh := New( WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), - WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) + WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})), + WithMetricsWriter(mm), + ) require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) @@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), - WithRefillMetabase(true)) + WithRefillMetabase(true), + WithMetricsWriter(mm)) require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) @@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) { }), } + mm := NewMetricStore() + sh := New( WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), @@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) { ), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama"))), + WithMetricsWriter(mm), ) // open Blobstor @@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) { ), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama_another"))), + WithMetricsWriter(mm), ) // open Blobstor @@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) { checkObj(object.AddressOf(tombObj), tombObj) checkTombMembers(true) checkLocked(t, cnrLocked, locked) + require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb + require.Equal(t, "completed", mm.refillStatus) + require.Equal(t, uint32(100), mm.refillPercent) } diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 2ab99eed..38d465f3 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -21,13 +21,28 @@ import ( ) type metricsStore struct { - mtx sync.Mutex - objCounters map[string]uint64 - cnrSize map[string]int64 - cnrCount map[string]uint64 - pldSize int64 - mode mode.Mode - errCounter int64 + mtx sync.Mutex + objCounters map[string]uint64 + cnrSize map[string]int64 + cnrCount map[string]uint64 + pldSize int64 + mode mode.Mode + errCounter int64 + refillCount int64 + refillSize int64 + refillPercent uint32 + refillStatus string +} + +func NewMetricStore() *metricsStore { + return &metricsStore{ + objCounters: map[string]uint64{ + "phy": 0, + "logic": 0, + }, + cnrSize: make(map[string]int64), + cnrCount: make(map[string]uint64), + } } func (m *metricsStore) SetShardID(_ string) {} @@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool return v, ok } +func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillCount++ + m.refillSize += int64(size) +} + +func (m *metricsStore) SetRefillPercent(_ string, percent uint32) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillPercent = percent +} + +func (m *metricsStore) SetRefillStatus(_ string, status string) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillStatus = status +} + func TestCounters(t *testing.T) { t.Parallel() @@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) { }), } - mm := &metricsStore{ - objCounters: map[string]uint64{ - "phy": 0, - "logic": 0, - }, - cnrSize: make(map[string]int64), - cnrCount: make(map[string]uint64), - } + mm := NewMetricStore() sh := New( WithID(NewIDFromBytes([]byte{})), diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index 511ce072..b5ea2fec 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) { WithMetaBaseOptions(metaOpts...), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama"))), + WithMetricsWriter(NewMetricStore()), } sh := New(opts...) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 81d85437..d9cd2b2f 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -90,6 +90,12 @@ type MetricsWriter interface { IncContainerObjectsCount(cnrID string, objectType string) // SubContainerObjectsCount subtracts container object count. SubContainerObjectsCount(cnrID string, objectType string, value uint64) + // IncRefillObjectsCount increments refill objects count. + IncRefillObjectsCount(path string, size int, success bool) + // SetRefillPercent sets refill percent. + SetRefillPercent(path string, percent uint32) + // SetRefillStatus sets refill status. + SetRefillStatus(path string, status string) } type cfg struct { diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 4cc54247..e37777e4 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -1,6 +1,7 @@ package metrics import ( + "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -23,6 +24,9 @@ type EngineMetrics interface { SetContainerObjectCounter(shardID, contID, objectType string, v uint64) IncContainerObjectCounter(shardID, contID, objectType string) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncRefillObjectsCount(shardID, path string, size int, success bool) + SetRefillPercent(shardID, path string, percent uint32) + SetRefillStatus(shardID, path, status string) WriteCache() WriteCacheMetrics GC() GCMetrics @@ -37,6 +41,11 @@ type engineMetrics struct { mode *shardIDModeValue contObjCounter *prometheus.GaugeVec + refillStatus *shardIDPathModeValue + refillObjCounter *prometheus.GaugeVec + refillPayloadCounter *prometheus.GaugeVec + refillPercentCounter *prometheus.GaugeVec + gc *gcMetrics writeCache *writeCacheMetrics } @@ -55,10 +64,14 @@ func newEngineMetrics() *engineMetrics { objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.", []string{shardIDLabel, typeLabel}), - gc: newGCMetrics(), - writeCache: newWriteCacheMetrics(), - mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), - contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), + gc: newGCMetrics(), + writeCache: newWriteCacheMetrics(), + mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), + contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), + refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"), + refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), + refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), + refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}), } } @@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) { m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.mode.Delete(shardID) + m.refillStatus.DeleteByShardID(shardID) } func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) { @@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics { func (m *engineMetrics) GC() GCMetrics { return m.gc } + +func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) { + m.refillObjCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + }, + ).Inc() + m.refillPayloadCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + }, + ).Add(float64(size)) +} + +func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) { + m.refillPercentCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Set(float64(percent)) +} + +func (m *engineMetrics) SetRefillStatus(shardID, path, status string) { + m.refillStatus.SetMode(shardID, path, status) +} diff --git a/pkg/metrics/mode.go b/pkg/metrics/mode.go index 312a6b33..cf6bdac4 100644 --- a/pkg/metrics/mode.go +++ b/pkg/metrics/mode.go @@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) { }) } +func (m *shardIDPathModeValue) DeleteByShardID(shardID string) { + m.modeValue.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + }) +} + func modeFromBool(readOnly bool) string { modeValue := readWriteMode if readOnly {