From 1005bf4f56d98cf09b285ee56ba1d447c508ffd8 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 5 Mar 2024 13:41:15 +0300 Subject: [PATCH 1/4] [#1024] shard: Add refill metabase benchmark Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/shard/refill_test.go | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 pkg/local_object_storage/shard/refill_test.go diff --git a/pkg/local_object_storage/shard/refill_test.go b/pkg/local_object_storage/shard/refill_test.go new file mode 100644 index 00000000..07ec9180 --- /dev/null +++ b/pkg/local_object_storage/shard/refill_test.go @@ -0,0 +1,72 @@ +package shard + +import ( + "context" + "os" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" + oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" + "github.com/stretchr/testify/require" +) + +func BenchmarkRefillMetabase(b *testing.B) { + b.Run("100 objects", func(b *testing.B) { + benchRefillMetabase(b, 100) + }) + + b.Run("1000 objects", func(b *testing.B) { + benchRefillMetabase(b, 1000) + }) + + b.Run("2000 objects", func(b *testing.B) { + benchRefillMetabase(b, 2000) + }) + + b.Run("5000 objects", func(b *testing.B) { + benchRefillMetabase(b, 5000) + }) +} + +func benchRefillMetabase(b *testing.B, objectsCount int) { + sh := newShard(b, false) + defer func() { require.NoError(b, sh.Close()) }() + + var putPrm PutPrm + + for i := 0; i < objectsCount/2; i++ { + obj := testutil.GenerateObject() + testutil.AddAttribute(obj, "foo", "bar") + testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj + + putPrm.SetObject(obj) + + _, err := sh.Put(context.Background(), putPrm) + require.NoError(b, err) + } + + for i := 0; i < objectsCount/2; i++ { + obj := testutil.GenerateObject() + testutil.AddAttribute(obj, "foo", "bar") + obj.SetID(oidtest.ID()) + testutil.AddPayload(obj, 1<<20) // fstree obj + + putPrm.SetObject(obj) + + _, err := sh.Put(context.Background(), putPrm) + require.NoError(b, err) + } + + require.NoError(b, sh.Close()) + require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path)) + + require.NoError(b, sh.Open(context.Background())) + sh.cfg.refillMetabase = true + + b.ReportAllocs() + b.ResetTimer() + + require.NoError(b, sh.Init(context.Background())) + + require.NoError(b, sh.Close()) +} -- 2.40.1 From 57466594fbef35d014479e7d96ccd17db438fef5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 5 Mar 2024 15:39:50 +0300 Subject: [PATCH 2/4] [#1024] shard: Resync metabase concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 11 ++- cmd/frostfs-node/config/engine/config_test.go | 2 + .../config/engine/shard/config.go | 15 ++++ config/example/node.env | 1 + config/example/node.json | 1 + config/example/node.yaml | 1 + docs/storage-node-configuration.md | 9 ++- pkg/local_object_storage/shard/control.go | 76 ++++++++++++------- pkg/local_object_storage/shard/refill_test.go | 6 +- pkg/local_object_storage/shard/shard.go | 10 ++- 10 files changed, 93 insertions(+), 39 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d78a90cf..c5ed8b50 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -119,10 +119,11 @@ type shardCfg struct { estimateCompressibility bool estimateCompressibilityThreshold float64 - smallSizeObjectLimit uint64 - uncompressableContentType []string - refillMetabase bool - mode shardmode.Mode + smallSizeObjectLimit uint64 + uncompressableContentType []string + refillMetabase bool + refillMetabaseWorkersCount int + mode shardmode.Mode metaCfg struct { path string @@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig var newConfig shardCfg newConfig.refillMetabase = oldConfig.RefillMetabase() + newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() @@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { sh.shOpts = []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), + shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions(blobstoreOpts...), shard.WithMetaBaseOptions(mbOptions...), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 86c93830..b5c926fc 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, mode.ReadOnly, sc.Mode()) + require.Equal(t, 100, sc.RefillMetabaseWorkersCount()) case 1: require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, fs.FileMode(0o644), pl.Perm()) @@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, mode.ReadWrite, sc.Mode()) + require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount()) } return nil }) diff --git a/cmd/frostfs-node/config/engine/shard/config.go b/cmd/frostfs-node/config/engine/shard/config.go index f9cfdf2a..0620c9f6 100644 --- a/cmd/frostfs-node/config/engine/shard/config.go +++ b/cmd/frostfs-node/config/engine/shard/config.go @@ -18,6 +18,7 @@ const ( // SmallSizeLimitDefault is a default limit of small objects payload in bytes. SmallSizeLimitDefault = 1 << 20 EstimateCompressibilityThresholdDefault = 0.1 + RefillMetabaseWorkersCountDefault = 500 ) // From wraps config section into Config. @@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool { ) } +// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter. +// +// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number. +func (x *Config) RefillMetabaseWorkersCount() int { + v := config.IntSafe( + (*config.Config)(x), + "resync_metabase_worker_count", + ) + if v > 0 { + return int(v) + } + return RefillMetabaseWorkersCountDefault +} + // Mode return the value of "mode" config parameter. // // Panics if read the value is not one of predefined diff --git a/config/example/node.env b/config/example/node.env index 976c4262..9068886d 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false +FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100 ### Flag to set shard mode FROSTFS_STORAGE_SHARD_0_MODE=read-only ### Write cache config diff --git a/config/example/node.json b/config/example/node.json index 648fb77b..bae28720 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -144,6 +144,7 @@ "0": { "mode": "read-only", "resync_metabase": false, + "resync_metabase_worker_count": 100, "writecache": { "enabled": false, "no_sync": true, diff --git a/config/example/node.yaml b/config/example/node.yaml index 2dcf7c4d..7a824adb 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -167,6 +167,7 @@ storage: # degraded-read-only # disabled (do not work with the shard, allows to not remove it from the config) resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding + resync_metabase_worker_count: 100 writecache: enabled: false diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 02ead302..aa3a1781 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -179,14 +179,15 @@ Contains configuration for each shard. Keys must be consecutive numbers starting `default` subsection has the same format and specifies defaults for missing values. The following table describes configuration for each shard. -| Parameter | Type | Default value | Description | -|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Parameter | Type | Default value | Description | +| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `compress` | `bool` | `false` | Flag to enable compression. | | `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). | -| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. | -| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. | +| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. | +| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. | | `mode` | `string` | `read-write` | Shard Mode.
Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | +| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 774fe804..d080e291 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -15,6 +15,7 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func (s *Shard) handleMetabaseFailure(stage string, err error) error { @@ -185,39 +186,56 @@ func (s *Shard) refillMetabase(ctx context.Context) error { return fmt.Errorf("could not reset metabase: %w", err) } - obj := objectSDK.New() + eg, egCtx := errgroup.WithContext(ctx) + if s.cfg.refillMetabaseWorkersCount > 0 { + eg.SetLimit(s.cfg.refillMetabaseWorkersCount) + } - err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { - if err := obj.Unmarshal(data); err != nil { - s.log.Warn(logs.ShardCouldNotUnmarshalObject, - zap.Stringer("address", addr), - zap.String("err", err.Error())) + itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { + eg.Go(func() error { + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + s.log.Warn(logs.ShardCouldNotUnmarshalObject, + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil + } + + var err error + switch obj.Type() { + case objectSDK.TypeTombstone: + err = s.refillTombstoneObject(egCtx, obj) + case objectSDK.TypeLock: + err = s.refillLockObject(egCtx, obj) + default: + } + if err != nil { + return err + } + + var mPrm meta.PutPrm + mPrm.SetObject(obj) + mPrm.SetStorageID(descriptor) + + _, err = s.metaBase.Put(egCtx, mPrm) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { + return err + } + + return nil + }) + + select { + case <-egCtx.Done(): + return egCtx.Err() + default: return nil } - - var err error - switch obj.Type() { - case objectSDK.TypeTombstone: - err = s.refillTombstoneObject(ctx, obj) - case objectSDK.TypeLock: - err = s.refillLockObject(ctx, obj) - default: - } - if err != nil { - return err - } - - var mPrm meta.PutPrm - mPrm.SetObject(obj) - mPrm.SetStorageID(descriptor) - - _, err = s.metaBase.Put(ctx, mPrm) - if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { - return err - } - - return nil }) + + egErr := eg.Wait() + + err = errors.Join(egErr, itErr) if err != nil { return fmt.Errorf("could not put objects to the meta: %w", err) } diff --git a/pkg/local_object_storage/shard/refill_test.go b/pkg/local_object_storage/shard/refill_test.go index 07ec9180..509ccaaa 100644 --- a/pkg/local_object_storage/shard/refill_test.go +++ b/pkg/local_object_storage/shard/refill_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" @@ -29,7 +30,10 @@ func BenchmarkRefillMetabase(b *testing.B) { } func benchRefillMetabase(b *testing.B, objectsCount int) { - sh := newShard(b, false) + sh := newCustomShard(b, false, shardOptions{ + additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)}, + }) + defer func() { require.NoError(b, sh.Close()) }() var putPrm PutPrm diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 8368f6db..81d85437 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -95,7 +95,8 @@ type MetricsWriter interface { type cfg struct { m sync.RWMutex - refillMetabase bool + refillMetabase bool + refillMetabaseWorkersCount int rmBatchSize int @@ -300,6 +301,13 @@ func WithRefillMetabase(v bool) Option { } } +// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step. +func WithRefillMetabaseWorkersCount(v int) Option { + return func(c *cfg) { + c.refillMetabaseWorkersCount = v + } +} + // WithMode returns option to set shard's mode. Mode must be one of the predefined: // - mode.ReadWrite; // - mode.ReadOnly. -- 2.40.1 From e3d9dd6ee8134107926cf4cf58d41899b1eebf96 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 5 Mar 2024 16:08:49 +0300 Subject: [PATCH 3/4] [#1024] blobovnicza: Copy data on iterate DB value is only valid while the tx is alive. But handler may to run something in other goroutine. Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/blobovnicza/iterate.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go index 32b0ccea..b1cb9192 100644 --- a/pkg/local_object_storage/blobovnicza/iterate.go +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "bytes" "context" "fmt" "math" @@ -158,7 +159,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, } if !prm.withoutData { - elem.data = v + elem.data = bytes.Clone(v) } return prm.handler(elem) -- 2.40.1 From 1b17258c042d3b1f1b4d2bdb530cbb5cd1560139 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 12 Mar 2024 17:36:26 +0300 Subject: [PATCH 4/4] [#1029] metabase: Add refill metrics Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 + .../blobovnicza/control.go | 4 + .../blobstor/blobovniczatree/count.go | 38 ++++++++ .../blobstor/blobovniczatree/metrics.go | 2 + .../blobstor/common/storage.go | 1 + .../blobstor/fstree/fstree.go | 39 ++++++++ .../blobstor/fstree/metrics.go | 2 + pkg/local_object_storage/blobstor/info.go | 45 +++++++++ .../blobstor/memstore/memstore.go | 7 ++ pkg/local_object_storage/blobstor/metrics.go | 2 + .../blobstor/teststore/teststore.go | 7 ++ pkg/local_object_storage/engine/metrics.go | 4 + pkg/local_object_storage/engine/shards.go | 12 +++ .../metrics/blobovnicza.go | 4 + pkg/local_object_storage/metrics/blobstore.go | 4 + pkg/local_object_storage/metrics/fstree.go | 4 + pkg/local_object_storage/shard/control.go | 94 ++++++++++++++----- .../shard/control_test.go | 16 +++- .../shard/metrics_test.go | 60 +++++++++--- pkg/local_object_storage/shard/reload_test.go | 1 + pkg/local_object_storage/shard/shard.go | 6 ++ pkg/metrics/engine.go | 53 ++++++++++- pkg/metrics/mode.go | 6 ++ 23 files changed, 366 insertions(+), 47 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/count.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 64e5f771..d56d425c 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -579,6 +579,8 @@ const ( EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" + EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available" + BlobstoreFailedToGetFileinfo = "failed to get file info" ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSaveECPart = "failed to save EC part" ) diff --git a/pkg/local_object_storage/blobovnicza/control.go b/pkg/local_object_storage/blobovnicza/control.go index dab30164..aeaa4e1d 100644 --- a/pkg/local_object_storage/blobovnicza/control.go +++ b/pkg/local_object_storage/blobovnicza/control.go @@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error { return b.initializeCounters() } +func (b *Blobovnicza) ObjectsCount() uint64 { + return b.itemsCount.Load() +} + func (b *Blobovnicza) initializeCounters() error { var size uint64 var items uint64 diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/count.go b/pkg/local_object_storage/blobstor/blobovniczatree/count.go new file mode 100644 index 00000000..cf91637d --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/count.go @@ -0,0 +1,38 @@ +package blobovniczatree + +import ( + "context" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" +) + +func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) { + var ( + success bool + startedAt = time.Now() + ) + defer func() { + b.metrics.ObjectsCount(time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount") + defer span.End() + + var result uint64 + err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) { + shDB := b.getBlobovniczaWithoutCaching(p) + blz, err := shDB.Open() + if err != nil { + return true, err + } + defer shDB.Close() + + result += blz.ObjectsCount() + return false, nil + }) + if err != nil { + return 0, err + } + return result, nil +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/metrics.go b/pkg/local_object_storage/blobstor/blobovniczatree/metrics.go index ed05f877..28289c19 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/metrics.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/metrics.go @@ -24,6 +24,7 @@ type Metrics interface { SetRebuildStatus(status string) ObjectMoved(d time.Duration) SetRebuildPercent(value uint32) + ObjectsCount(d time.Duration, success bool) Delete(d time.Duration, success, withStorageID bool) Exists(d time.Duration, success, withStorageID bool) @@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {} +func (m *noopMetrics) ObjectsCount(time.Duration, bool) {} func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics { return &blobovnicza.NoopMetrics{} } diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index e552fafe..8f629b1d 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -15,6 +15,7 @@ type Storage interface { Type() string Path() string + ObjectsCount(ctx context.Context) (uint64, error) SetCompressor(cc *compression.Config) Compressor() *compression.Config diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 420f341a..7a064af6 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) { return counter, nil } +func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + t.metrics.ObjectsCount(time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount", + trace.WithAttributes( + attribute.String("path", t.RootPath), + )) + defer span.End() + + var result uint64 + + err := filepath.WalkDir(t.RootPath, + func(_ string, d fs.DirEntry, _ error) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if !d.IsDir() { + result++ + } + + return nil + }, + ) + if err != nil { + return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err) + } + success = true + return result, nil +} + // Type is fstree storage type used in logs and configuration. const Type = "fstree" diff --git a/pkg/local_object_storage/blobstor/fstree/metrics.go b/pkg/local_object_storage/blobstor/fstree/metrics.go index ca6a5497..eeb68667 100644 --- a/pkg/local_object_storage/blobstor/fstree/metrics.go +++ b/pkg/local_object_storage/blobstor/fstree/metrics.go @@ -14,6 +14,7 @@ type Metrics interface { Put(d time.Duration, size int, success bool) Get(d time.Duration, size int, success bool) GetRange(d time.Duration, size int, success bool) + ObjectsCount(d time.Duration, success bool) } type noopMetrics struct{} @@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool) {} func (m *noopMetrics) GetRange(time.Duration, int, bool) {} +func (m *noopMetrics) ObjectsCount(time.Duration, bool) {} diff --git a/pkg/local_object_storage/blobstor/info.go b/pkg/local_object_storage/blobstor/info.go index 2fd62af8..8a5bb870 100644 --- a/pkg/local_object_storage/blobstor/info.go +++ b/pkg/local_object_storage/blobstor/info.go @@ -1,5 +1,14 @@ package blobstor +import ( + "context" + "sync/atomic" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" + "golang.org/x/sync/errgroup" +) + // DumpInfo returns information about blob stor. func (b *BlobStor) DumpInfo() Info { b.modeMtx.RLock() @@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info { SubStorages: sub, } } + +// ObjectsCount returns Blobstore's total objects count. +func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) { + var err error + startedAt := time.Now() + defer func() { + b.metrics.ObjectsCount(time.Since(startedAt), err == nil) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount") + defer span.End() + + b.modeMtx.RLock() + defer b.modeMtx.RUnlock() + + var result atomic.Uint64 + + eg, egCtx := errgroup.WithContext(ctx) + for i := range b.storage { + i := i + eg.Go(func() error { + v, e := b.storage[i].Storage.ObjectsCount(egCtx) + if e != nil { + return e + } + result.Add(v) + return nil + }) + } + + if err = eg.Wait(); err != nil { + return 0, err + } + + return result.Load(), nil +} diff --git a/pkg/local_object_storage/blobstor/memstore/memstore.go b/pkg/local_object_storage/blobstor/memstore/memstore.go index 39bed00b..0252c798 100644 --- a/pkg/local_object_storage/blobstor/memstore/memstore.go +++ b/pkg/local_object_storage/blobstor/memstore/memstore.go @@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) { return common.RebuildRes{}, nil } + +func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return uint64(len(s.objs)), nil +} diff --git a/pkg/local_object_storage/blobstor/metrics.go b/pkg/local_object_storage/blobstor/metrics.go index 4a7b4009..aadc237a 100644 --- a/pkg/local_object_storage/blobstor/metrics.go +++ b/pkg/local_object_storage/blobstor/metrics.go @@ -13,6 +13,7 @@ type Metrics interface { Get(d time.Duration, size int, success, withStorageID bool) Iterate(d time.Duration, success bool) Put(d time.Duration, size int, success bool) + ObjectsCount(d time.Duration, success bool) } type noopMetrics struct{} @@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {} +func (m *noopMetrics) ObjectsCount(time.Duration, bool) {} diff --git a/pkg/local_object_storage/blobstor/teststore/teststore.go b/pkg/local_object_storage/blobstor/teststore/teststore.go index c0cdfacf..016fd520 100644 --- a/pkg/local_object_storage/blobstor/teststore/teststore.go +++ b/pkg/local_object_storage/blobstor/teststore/teststore.go @@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {} func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) { return common.RebuildRes{}, nil } + +func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.st.ObjectsCount(ctx) +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 91dfa876..6b99dff7 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -27,6 +27,10 @@ type MetricRegister interface { IncContainerObjectCounter(shardID, contID, objectType string) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncRefillObjectsCount(shardID, path string, size int, success bool) + SetRefillPercent(shardID, path string, percent uint32) + SetRefillStatus(shardID, path, status string) + WriteCache() metrics.WriteCacheMetrics GC() metrics.GCMetrics } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index ccf2e9da..54d3e7d6 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value) } +func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) { + m.mw.IncRefillObjectsCount(m.id, path, size, success) +} + +func (m *metricsWithID) SetRefillPercent(path string, percent uint32) { + m.mw.SetRefillPercent(m.id, path, percent) +} + +func (m *metricsWithID) SetRefillStatus(path string, status string) { + m.mw.SetRefillStatus(m.id, path, status) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/metrics/blobovnicza.go b/pkg/local_object_storage/metrics/blobovnicza.go index 3a92a7bc..1e294efa 100644 --- a/pkg/local_object_storage/metrics/blobovnicza.go +++ b/pkg/local_object_storage/metrics/blobovnicza.go @@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) { } } +func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) { + m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{}) +} + type blobovniczaMetrics struct { m metrics_impl.BlobobvnizcaMetrics shardID func() string diff --git a/pkg/local_object_storage/metrics/blobstore.go b/pkg/local_object_storage/metrics/blobstore.go index 48249e89..b3871dfc 100644 --- a/pkg/local_object_storage/metrics/blobstore.go +++ b/pkg/local_object_storage/metrics/blobstore.go @@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) { m.m.AddPut(m.shardID, size) } } + +func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{}) +} diff --git a/pkg/local_object_storage/metrics/fstree.go b/pkg/local_object_storage/metrics/fstree.go index 0def3210..e035b3a4 100644 --- a/pkg/local_object_storage/metrics/fstree.go +++ b/pkg/local_object_storage/metrics/fstree.go @@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) { m.m.AddGet(m.shardID, m.path, size) } } + +func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) { + m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success) +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d080e291..6712822a 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -181,47 +182,54 @@ func (s *Shard) Init(ctx context.Context) error { } func (s *Shard) refillMetabase(ctx context.Context) error { + path := s.metaBase.DumpInfo().Path + s.metricsWriter.SetRefillStatus(path, "running") + s.metricsWriter.SetRefillPercent(path, 0) + var success bool + defer func() { + if success { + s.metricsWriter.SetRefillStatus(path, "completed") + } else { + s.metricsWriter.SetRefillStatus(path, "failed") + } + }() + err := s.metaBase.Reset() if err != nil { return fmt.Errorf("could not reset metabase: %w", err) } + withCount := true + totalObjects, err := s.blobStor.ObjectsCount(ctx) + if err != nil { + s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err)) + withCount = false + } + eg, egCtx := errgroup.WithContext(ctx) if s.cfg.refillMetabaseWorkersCount > 0 { eg.SetLimit(s.cfg.refillMetabaseWorkersCount) } + var completedCount uint64 + var metricGuard sync.Mutex itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { eg.Go(func() error { - obj := objectSDK.New() - if err := obj.Unmarshal(data); err != nil { - s.log.Warn(logs.ShardCouldNotUnmarshalObject, - zap.Stringer("address", addr), - zap.String("err", err.Error())) - return nil - } + var success bool + defer func() { + s.metricsWriter.IncRefillObjectsCount(path, len(data), success) + if withCount { + metricGuard.Lock() + completedCount++ + s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects)) + metricGuard.Unlock() + } + }() - var err error - switch obj.Type() { - case objectSDK.TypeTombstone: - err = s.refillTombstoneObject(egCtx, obj) - case objectSDK.TypeLock: - err = s.refillLockObject(egCtx, obj) - default: - } - if err != nil { + if err := s.refillObject(egCtx, data, addr, descriptor); err != nil { return err } - - var mPrm meta.PutPrm - mPrm.SetObject(obj) - mPrm.SetStorageID(descriptor) - - _, err = s.metaBase.Put(egCtx, mPrm) - if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { - return err - } - + success = true return nil }) @@ -245,6 +253,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error { return fmt.Errorf("could not sync object counters: %w", err) } + success = true + s.metricsWriter.SetRefillPercent(path, 100) + return nil +} + +func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error { + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + s.log.Warn(logs.ShardCouldNotUnmarshalObject, + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil + } + + var err error + switch obj.Type() { + case objectSDK.TypeTombstone: + err = s.refillTombstoneObject(ctx, obj) + case objectSDK.TypeLock: + err = s.refillLockObject(ctx, obj) + default: + } + if err != nil { + return err + } + + var mPrm meta.PutPrm + mPrm.SetObject(obj) + mPrm.SetStorageID(descriptor) + + _, err = s.metaBase.Put(ctx, mPrm) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { + return err + } return nil } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index d08747e1..44fee163 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) { }), } + mm := NewMetricStore() + sh := New( WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), - WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) + WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})), + WithMetricsWriter(mm), + ) require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) @@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) { WithBlobStorOptions(blobOpts...), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), - WithRefillMetabase(true)) + WithRefillMetabase(true), + WithMetricsWriter(mm)) require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Init(context.Background())) @@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) { }), } + mm := NewMetricStore() + sh := New( WithID(NewIDFromBytes([]byte{})), WithBlobStorOptions(blobOpts...), @@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) { ), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama"))), + WithMetricsWriter(mm), ) // open Blobstor @@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) { ), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama_another"))), + WithMetricsWriter(mm), ) // open Blobstor @@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) { checkObj(object.AddressOf(tombObj), tombObj) checkTombMembers(true) checkLocked(t, cnrLocked, locked) + require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb + require.Equal(t, "completed", mm.refillStatus) + require.Equal(t, uint32(100), mm.refillPercent) } diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index 2ab99eed..38d465f3 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -21,13 +21,28 @@ import ( ) type metricsStore struct { - mtx sync.Mutex - objCounters map[string]uint64 - cnrSize map[string]int64 - cnrCount map[string]uint64 - pldSize int64 - mode mode.Mode - errCounter int64 + mtx sync.Mutex + objCounters map[string]uint64 + cnrSize map[string]int64 + cnrCount map[string]uint64 + pldSize int64 + mode mode.Mode + errCounter int64 + refillCount int64 + refillSize int64 + refillPercent uint32 + refillStatus string +} + +func NewMetricStore() *metricsStore { + return &metricsStore{ + objCounters: map[string]uint64{ + "phy": 0, + "logic": 0, + }, + cnrSize: make(map[string]int64), + cnrCount: make(map[string]uint64), + } } func (m *metricsStore) SetShardID(_ string) {} @@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool return v, ok } +func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillCount++ + m.refillSize += int64(size) +} + +func (m *metricsStore) SetRefillPercent(_ string, percent uint32) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillPercent = percent +} + +func (m *metricsStore) SetRefillStatus(_ string, status string) { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.refillStatus = status +} + func TestCounters(t *testing.T) { t.Parallel() @@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) { }), } - mm := &metricsStore{ - objCounters: map[string]uint64{ - "phy": 0, - "logic": 0, - }, - cnrSize: make(map[string]int64), - cnrCount: make(map[string]uint64), - } + mm := NewMetricStore() sh := New( WithID(NewIDFromBytes([]byte{})), diff --git a/pkg/local_object_storage/shard/reload_test.go b/pkg/local_object_storage/shard/reload_test.go index 511ce072..b5ea2fec 100644 --- a/pkg/local_object_storage/shard/reload_test.go +++ b/pkg/local_object_storage/shard/reload_test.go @@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) { WithMetaBaseOptions(metaOpts...), WithPiloramaOptions( pilorama.WithPath(filepath.Join(p, "pilorama"))), + WithMetricsWriter(NewMetricStore()), } sh := New(opts...) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 81d85437..d9cd2b2f 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -90,6 +90,12 @@ type MetricsWriter interface { IncContainerObjectsCount(cnrID string, objectType string) // SubContainerObjectsCount subtracts container object count. SubContainerObjectsCount(cnrID string, objectType string, value uint64) + // IncRefillObjectsCount increments refill objects count. + IncRefillObjectsCount(path string, size int, success bool) + // SetRefillPercent sets refill percent. + SetRefillPercent(path string, percent uint32) + // SetRefillStatus sets refill status. + SetRefillStatus(path string, status string) } type cfg struct { diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 4cc54247..e37777e4 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -1,6 +1,7 @@ package metrics import ( + "strconv" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -23,6 +24,9 @@ type EngineMetrics interface { SetContainerObjectCounter(shardID, contID, objectType string, v uint64) IncContainerObjectCounter(shardID, contID, objectType string) SubContainerObjectCounter(shardID, contID, objectType string, v uint64) + IncRefillObjectsCount(shardID, path string, size int, success bool) + SetRefillPercent(shardID, path string, percent uint32) + SetRefillStatus(shardID, path, status string) WriteCache() WriteCacheMetrics GC() GCMetrics @@ -37,6 +41,11 @@ type engineMetrics struct { mode *shardIDModeValue contObjCounter *prometheus.GaugeVec + refillStatus *shardIDPathModeValue + refillObjCounter *prometheus.GaugeVec + refillPayloadCounter *prometheus.GaugeVec + refillPercentCounter *prometheus.GaugeVec + gc *gcMetrics writeCache *writeCacheMetrics } @@ -55,10 +64,14 @@ func newEngineMetrics() *engineMetrics { objectCounter: newEngineGaugeVector("objects_total", "Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.", []string{shardIDLabel, typeLabel}), - gc: newGCMetrics(), - writeCache: newWriteCacheMetrics(), - mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), - contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), + gc: newGCMetrics(), + writeCache: newWriteCacheMetrics(), + mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), + contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), + refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"), + refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), + refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}), + refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}), } } @@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) { m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) + m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.mode.Delete(shardID) + m.refillStatus.DeleteByShardID(shardID) } func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) { @@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics { func (m *engineMetrics) GC() GCMetrics { return m.gc } + +func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) { + m.refillObjCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + }, + ).Inc() + m.refillPayloadCounter.With( + prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + successLabel: strconv.FormatBool(success), + }, + ).Add(float64(size)) +} + +func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) { + m.refillPercentCounter.With(prometheus.Labels{ + shardIDLabel: shardID, + pathLabel: path, + }).Set(float64(percent)) +} + +func (m *engineMetrics) SetRefillStatus(shardID, path, status string) { + m.refillStatus.SetMode(shardID, path, status) +} diff --git a/pkg/metrics/mode.go b/pkg/metrics/mode.go index 312a6b33..cf6bdac4 100644 --- a/pkg/metrics/mode.go +++ b/pkg/metrics/mode.go @@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) { }) } +func (m *shardIDPathModeValue) DeleteByShardID(shardID string) { + m.modeValue.DeletePartialMatch(prometheus.Labels{ + shardIDLabel: shardID, + }) +} + func modeFromBool(readOnly bool) string { modeValue := readWriteMode if readOnly { -- 2.40.1