Speed up metabase refill #1027
32 changed files with 506 additions and 60 deletions
|
@ -119,10 +119,11 @@ type shardCfg struct {
|
||||||
estimateCompressibility bool
|
estimateCompressibility bool
|
||||||
estimateCompressibilityThreshold float64
|
estimateCompressibilityThreshold float64
|
||||||
|
|
||||||
smallSizeObjectLimit uint64
|
smallSizeObjectLimit uint64
|
||||||
uncompressableContentType []string
|
uncompressableContentType []string
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
mode shardmode.Mode
|
refillMetabaseWorkersCount int
|
||||||
|
mode shardmode.Mode
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
path string
|
path string
|
||||||
|
@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
var newConfig shardCfg
|
var newConfig shardCfg
|
||||||
|
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||||
sh.shOpts = []shard.Option{
|
sh.shOpts = []shard.Option{
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||||
shard.WithMetaBaseOptions(mbOptions...),
|
shard.WithMetaBaseOptions(mbOptions...),
|
||||||
|
|
|
@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
|
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||||
case 1:
|
case 1:
|
||||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||||
|
@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
|
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -18,6 +18,7 @@ const (
|
||||||
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
||||||
SmallSizeLimitDefault = 1 << 20
|
SmallSizeLimitDefault = 1 << 20
|
||||||
EstimateCompressibilityThresholdDefault = 0.1
|
EstimateCompressibilityThresholdDefault = 0.1
|
||||||
|
RefillMetabaseWorkersCountDefault = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
|
@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
|
||||||
|
//
|
||||||
|
// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number.
|
||||||
|
func (x *Config) RefillMetabaseWorkersCount() int {
|
||||||
|
v := config.IntSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"resync_metabase_worker_count",
|
||||||
|
)
|
||||||
|
if v > 0 {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
return RefillMetabaseWorkersCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
// Mode return the value of "mode" config parameter.
|
// Mode return the value of "mode" config parameter.
|
||||||
//
|
//
|
||||||
// Panics if read the value is not one of predefined
|
// Panics if read the value is not one of predefined
|
||||||
|
|
|
@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
|
||||||
## 0 shard
|
## 0 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||||
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
||||||
### Flag to set shard mode
|
### Flag to set shard mode
|
||||||
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
||||||
### Write cache config
|
### Write cache config
|
||||||
|
|
|
@ -144,6 +144,7 @@
|
||||||
"0": {
|
"0": {
|
||||||
"mode": "read-only",
|
"mode": "read-only",
|
||||||
"resync_metabase": false,
|
"resync_metabase": false,
|
||||||
|
"resync_metabase_worker_count": 100,
|
||||||
"writecache": {
|
"writecache": {
|
||||||
"enabled": false,
|
"enabled": false,
|
||||||
"no_sync": true,
|
"no_sync": true,
|
||||||
|
|
|
@ -167,6 +167,7 @@ storage:
|
||||||
# degraded-read-only
|
# degraded-read-only
|
||||||
# disabled (do not work with the shard, allows to not remove it from the config)
|
# disabled (do not work with the shard, allows to not remove it from the config)
|
||||||
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
||||||
|
resync_metabase_worker_count: 100
|
||||||
|
|
||||||
writecache:
|
writecache:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
|
@ -179,14 +179,15 @@ Contains configuration for each shard. Keys must be consecutive numbers starting
|
||||||
`default` subsection has the same format and specifies defaults for missing values.
|
`default` subsection has the same format and specifies defaults for missing values.
|
||||||
The following table describes configuration for each shard.
|
The following table describes configuration for each shard.
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `compress` | `bool` | `false` | Flag to enable compression. |
|
| `compress` | `bool` | `false` | Flag to enable compression. |
|
||||||
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
||||||
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||||
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
|
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
|
|
@ -579,6 +579,8 @@ const (
|
||||||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||||
|
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||||
|
BlobstoreFailedToGetFileinfo = "failed to get file info"
|
||||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||||
ECFailedToSaveECPart = "failed to save EC part"
|
ECFailedToSaveECPart = "failed to save EC part"
|
||||||
)
|
)
|
||||||
|
|
|
@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error {
|
||||||
return b.initializeCounters()
|
return b.initializeCounters()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Blobovnicza) ObjectsCount() uint64 {
|
||||||
|
return b.itemsCount.Load()
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Blobovnicza) initializeCounters() error {
|
func (b *Blobovnicza) initializeCounters() error {
|
||||||
var size uint64
|
var size uint64
|
||||||
var items uint64
|
var items uint64
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
@ -158,7 +159,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
||||||
}
|
}
|
||||||
|
|
||||||
if !prm.withoutData {
|
if !prm.withoutData {
|
||||||
elem.data = v
|
elem.data = bytes.Clone(v)
|
||||||
}
|
}
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
|
|
38
pkg/local_object_storage/blobstor/blobovniczatree/count.go
Normal file
38
pkg/local_object_storage/blobstor/blobovniczatree/count.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
|
var (
|
||||||
|
success bool
|
||||||
|
startedAt = time.Now()
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
b.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var result uint64
|
||||||
|
err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
|
||||||
|
shDB := b.getBlobovniczaWithoutCaching(p)
|
||||||
|
blz, err := shDB.Open()
|
||||||
|
if err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
defer shDB.Close()
|
||||||
|
|
||||||
|
result += blz.ObjectsCount()
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ type Metrics interface {
|
||||||
SetRebuildStatus(status string)
|
SetRebuildStatus(status string)
|
||||||
ObjectMoved(d time.Duration)
|
ObjectMoved(d time.Duration)
|
||||||
SetRebuildPercent(value uint32)
|
SetRebuildPercent(value uint32)
|
||||||
|
ObjectsCount(d time.Duration, success bool)
|
||||||
|
|
||||||
Delete(d time.Duration, success, withStorageID bool)
|
Delete(d time.Duration, success, withStorageID bool)
|
||||||
Exists(d time.Duration, success, withStorageID bool)
|
Exists(d time.Duration, success, withStorageID bool)
|
||||||
|
@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
||||||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||||
|
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||||
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
|
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
|
||||||
return &blobovnicza.NoopMetrics{}
|
return &blobovnicza.NoopMetrics{}
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ type Storage interface {
|
||||||
|
|
||||||
Type() string
|
Type() string
|
||||||
Path() string
|
Path() string
|
||||||
|
ObjectsCount(ctx context.Context) (uint64, error)
|
||||||
|
|
||||||
SetCompressor(cc *compression.Config)
|
SetCompressor(cc *compression.Config)
|
||||||
Compressor() *compression.Config
|
Compressor() *compression.Config
|
||||||
|
|
|
@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) {
|
||||||
return counter, nil
|
return counter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
t.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.String("path", t.RootPath),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
var result uint64
|
||||||
|
|
||||||
|
err := filepath.WalkDir(t.RootPath,
|
||||||
|
func(_ string, d fs.DirEntry, _ error) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if !d.IsDir() {
|
||||||
|
result++
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||||
|
}
|
||||||
|
success = true
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Type is fstree storage type used in logs and configuration.
|
// Type is fstree storage type used in logs and configuration.
|
||||||
const Type = "fstree"
|
const Type = "fstree"
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ type Metrics interface {
|
||||||
Put(d time.Duration, size int, success bool)
|
Put(d time.Duration, size int, success bool)
|
||||||
Get(d time.Duration, size int, success bool)
|
Get(d time.Duration, size int, success bool)
|
||||||
GetRange(d time.Duration, size int, success bool)
|
GetRange(d time.Duration, size int, success bool)
|
||||||
|
ObjectsCount(d time.Duration, success bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopMetrics struct{}
|
type noopMetrics struct{}
|
||||||
|
@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {}
|
||||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||||
func (m *noopMetrics) Get(time.Duration, int, bool) {}
|
func (m *noopMetrics) Get(time.Duration, int, bool) {}
|
||||||
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
|
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
|
||||||
|
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||||
|
|
|
@ -1,5 +1,14 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
// DumpInfo returns information about blob stor.
|
// DumpInfo returns information about blob stor.
|
||||||
func (b *BlobStor) DumpInfo() Info {
|
func (b *BlobStor) DumpInfo() Info {
|
||||||
b.modeMtx.RLock()
|
b.modeMtx.RLock()
|
||||||
|
@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info {
|
||||||
SubStorages: sub,
|
SubStorages: sub,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ObjectsCount returns Blobstore's total objects count.
|
||||||
|
func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
|
var err error
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
b.metrics.ObjectsCount(time.Since(startedAt), err == nil)
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
b.modeMtx.RLock()
|
||||||
|
defer b.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
var result atomic.Uint64
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
for i := range b.storage {
|
||||||
|
i := i
|
||||||
|
eg.Go(func() error {
|
||||||
|
v, e := b.storage[i].Storage.ObjectsCount(egCtx)
|
||||||
|
if e != nil {
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
result.Add(v)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = eg.Wait(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result.Load(), nil
|
||||||
|
}
|
||||||
|
|
|
@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
||||||
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||||
return common.RebuildRes{}, nil
|
return common.RebuildRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
return uint64(len(s.objs)), nil
|
||||||
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ type Metrics interface {
|
||||||
Get(d time.Duration, size int, success, withStorageID bool)
|
Get(d time.Duration, size int, success, withStorageID bool)
|
||||||
Iterate(d time.Duration, success bool)
|
Iterate(d time.Duration, success bool)
|
||||||
Put(d time.Duration, size int, success bool)
|
Put(d time.Duration, size int, success bool)
|
||||||
|
ObjectsCount(d time.Duration, success bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopMetrics struct{}
|
type noopMetrics struct{}
|
||||||
|
@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
||||||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||||
|
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||||
|
|
|
@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {}
|
||||||
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||||
return common.RebuildRes{}, nil
|
return common.RebuildRes{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
return s.st.ObjectsCount(ctx)
|
||||||
|
}
|
||||||
|
|
|
@ -27,6 +27,10 @@ type MetricRegister interface {
|
||||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
|
||||||
|
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||||
|
SetRefillPercent(shardID, path string, percent uint32)
|
||||||
|
SetRefillStatus(shardID, path, status string)
|
||||||
|
|
||||||
WriteCache() metrics.WriteCacheMetrics
|
WriteCache() metrics.WriteCacheMetrics
|
||||||
GC() metrics.GCMetrics
|
GC() metrics.GCMetrics
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string
|
||||||
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
|
||||||
|
m.mw.IncRefillObjectsCount(m.id, path, size, success)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
|
||||||
|
m.mw.SetRefillPercent(m.id, path, percent)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsWithID) SetRefillStatus(path string, status string) {
|
||||||
|
m.mw.SetRefillStatus(m.id, path, status)
|
||||||
|
}
|
||||||
|
|
||||||
// AddShard adds a new shard to the storage engine.
|
// AddShard adds a new shard to the storage engine.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that did not allow adding a shard.
|
// Returns any error encountered that did not allow adding a shard.
|
||||||
|
|
|
@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||||
|
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||||
|
}
|
||||||
|
|
||||||
type blobovniczaMetrics struct {
|
type blobovniczaMetrics struct {
|
||||||
m metrics_impl.BlobobvnizcaMetrics
|
m metrics_impl.BlobobvnizcaMetrics
|
||||||
shardID func() string
|
shardID func() string
|
||||||
|
|
|
@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) {
|
||||||
m.m.AddPut(m.shardID, size)
|
m.m.AddPut(m.shardID, size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||||
|
m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||||
|
}
|
||||||
|
|
|
@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) {
|
||||||
m.m.AddGet(m.shardID, m.path, size)
|
m.m.AddGet(m.shardID, m.path, size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||||
|
m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success)
|
||||||
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
@ -15,6 +16,7 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
||||||
|
@ -180,44 +182,68 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Shard) refillMetabase(ctx context.Context) error {
|
func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
|
path := s.metaBase.DumpInfo().Path
|
||||||
|
s.metricsWriter.SetRefillStatus(path, "running")
|
||||||
|
s.metricsWriter.SetRefillPercent(path, 0)
|
||||||
|
var success bool
|
||||||
|
defer func() {
|
||||||
|
if success {
|
||||||
|
s.metricsWriter.SetRefillStatus(path, "completed")
|
||||||
|
} else {
|
||||||
|
s.metricsWriter.SetRefillStatus(path, "failed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
err := s.metaBase.Reset()
|
err := s.metaBase.Reset()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not reset metabase: %w", err)
|
return fmt.Errorf("could not reset metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := objectSDK.New()
|
withCount := true
|
||||||
|
totalObjects, err := s.blobStor.ObjectsCount(ctx)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
|
||||||
|
withCount = false
|
||||||
|
}
|
||||||
|
|
||||||
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if s.cfg.refillMetabaseWorkersCount > 0 {
|
||||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
|
||||||
zap.Stringer("address", addr),
|
}
|
||||||
zap.String("err", err.Error()))
|
|
||||||
|
var completedCount uint64
|
||||||
|
var metricGuard sync.Mutex
|
||||||
|
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||||
|
eg.Go(func() error {
|
||||||
|
var success bool
|
||||||
|
defer func() {
|
||||||
|
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
|
||||||
|
if withCount {
|
||||||
|
metricGuard.Lock()
|
||||||
|
completedCount++
|
||||||
|
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
|
||||||
|
metricGuard.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
success = true
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-egCtx.Done():
|
||||||
|
return egCtx.Err()
|
||||||
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
|
||||||
switch obj.Type() {
|
|
||||||
case objectSDK.TypeTombstone:
|
|
||||||
err = s.refillTombstoneObject(ctx, obj)
|
|
||||||
case objectSDK.TypeLock:
|
|
||||||
err = s.refillLockObject(ctx, obj)
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var mPrm meta.PutPrm
|
|
||||||
mPrm.SetObject(obj)
|
|
||||||
mPrm.SetStorageID(descriptor)
|
|
||||||
|
|
||||||
_, err = s.metaBase.Put(ctx, mPrm)
|
|
||||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
egErr := eg.Wait()
|
||||||
|
|
||||||
|
err = errors.Join(egErr, itErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
return fmt.Errorf("could not put objects to the meta: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -227,6 +253,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
return fmt.Errorf("could not sync object counters: %w", err)
|
return fmt.Errorf("could not sync object counters: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
success = true
|
||||||
|
s.metricsWriter.SetRefillPercent(path, 100)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
|
||||||
|
obj := objectSDK.New()
|
||||||
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
|
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||||
|
zap.Stringer("address", addr),
|
||||||
|
zap.String("err", err.Error()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
switch obj.Type() {
|
||||||
|
case objectSDK.TypeTombstone:
|
||||||
|
err = s.refillTombstoneObject(ctx, obj)
|
||||||
|
case objectSDK.TypeLock:
|
||||||
|
err = s.refillLockObject(ctx, obj)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var mPrm meta.PutPrm
|
||||||
|
mPrm.SetObject(obj)
|
||||||
|
mPrm.SetStorageID(descriptor)
|
||||||
|
|
||||||
|
_, err = s.metaBase.Put(ctx, mPrm)
|
||||||
|
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mm := NewMetricStore()
|
||||||
|
|
||||||
sh := New(
|
sh := New(
|
||||||
WithID(NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
WithBlobStorOptions(blobOpts...),
|
WithBlobStorOptions(blobOpts...),
|
||||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
|
||||||
|
WithMetricsWriter(mm),
|
||||||
|
)
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
||||||
WithBlobStorOptions(blobOpts...),
|
WithBlobStorOptions(blobOpts...),
|
||||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||||
WithRefillMetabase(true))
|
WithRefillMetabase(true),
|
||||||
|
WithMetricsWriter(mm))
|
||||||
require.NoError(t, sh.Open(context.Background()))
|
require.NoError(t, sh.Open(context.Background()))
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mm := NewMetricStore()
|
||||||
|
|
||||||
sh := New(
|
sh := New(
|
||||||
WithID(NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
WithBlobStorOptions(blobOpts...),
|
WithBlobStorOptions(blobOpts...),
|
||||||
|
@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
),
|
),
|
||||||
WithPiloramaOptions(
|
WithPiloramaOptions(
|
||||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||||
|
WithMetricsWriter(mm),
|
||||||
)
|
)
|
||||||
|
|
||||||
// open Blobstor
|
// open Blobstor
|
||||||
|
@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
),
|
),
|
||||||
WithPiloramaOptions(
|
WithPiloramaOptions(
|
||||||
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
|
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
|
||||||
|
WithMetricsWriter(mm),
|
||||||
)
|
)
|
||||||
|
|
||||||
// open Blobstor
|
// open Blobstor
|
||||||
|
@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) {
|
||||||
checkObj(object.AddressOf(tombObj), tombObj)
|
checkObj(object.AddressOf(tombObj), tombObj)
|
||||||
checkTombMembers(true)
|
checkTombMembers(true)
|
||||||
checkLocked(t, cnrLocked, locked)
|
checkLocked(t, cnrLocked, locked)
|
||||||
|
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
|
||||||
|
require.Equal(t, "completed", mm.refillStatus)
|
||||||
|
require.Equal(t, uint32(100), mm.refillPercent)
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,13 +21,28 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type metricsStore struct {
|
type metricsStore struct {
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
objCounters map[string]uint64
|
objCounters map[string]uint64
|
||||||
cnrSize map[string]int64
|
cnrSize map[string]int64
|
||||||
cnrCount map[string]uint64
|
cnrCount map[string]uint64
|
||||||
pldSize int64
|
pldSize int64
|
||||||
mode mode.Mode
|
mode mode.Mode
|
||||||
errCounter int64
|
errCounter int64
|
||||||
|
refillCount int64
|
||||||
|
refillSize int64
|
||||||
|
refillPercent uint32
|
||||||
|
refillStatus string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetricStore() *metricsStore {
|
||||||
|
return &metricsStore{
|
||||||
|
objCounters: map[string]uint64{
|
||||||
|
"phy": 0,
|
||||||
|
"logic": 0,
|
||||||
|
},
|
||||||
|
cnrSize: make(map[string]int64),
|
||||||
|
cnrCount: make(map[string]uint64),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsStore) SetShardID(_ string) {}
|
func (m *metricsStore) SetShardID(_ string) {}
|
||||||
|
@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool
|
||||||
return v, ok
|
return v, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
|
m.refillCount++
|
||||||
|
m.refillSize += int64(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) SetRefillPercent(_ string, percent uint32) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
|
m.refillPercent = percent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *metricsStore) SetRefillStatus(_ string, status string) {
|
||||||
|
m.mtx.Lock()
|
||||||
|
defer m.mtx.Unlock()
|
||||||
|
|
||||||
|
m.refillStatus = status
|
||||||
|
}
|
||||||
|
|
||||||
func TestCounters(t *testing.T) {
|
func TestCounters(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
mm := &metricsStore{
|
mm := NewMetricStore()
|
||||||
objCounters: map[string]uint64{
|
|
||||||
"phy": 0,
|
|
||||||
"logic": 0,
|
|
||||||
},
|
|
||||||
cnrSize: make(map[string]int64),
|
|
||||||
cnrCount: make(map[string]uint64),
|
|
||||||
}
|
|
||||||
|
|
||||||
sh := New(
|
sh := New(
|
||||||
WithID(NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
|
|
76
pkg/local_object_storage/shard/refill_test.go
Normal file
76
pkg/local_object_storage/shard/refill_test.go
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
package shard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkRefillMetabase(b *testing.B) {
|
||||||
|
b.Run("100 objects", func(b *testing.B) {
|
||||||
|
benchRefillMetabase(b, 100)
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("1000 objects", func(b *testing.B) {
|
||||||
|
benchRefillMetabase(b, 1000)
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("2000 objects", func(b *testing.B) {
|
||||||
|
benchRefillMetabase(b, 2000)
|
||||||
|
})
|
||||||
|
|
||||||
|
b.Run("5000 objects", func(b *testing.B) {
|
||||||
|
benchRefillMetabase(b, 5000)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||||
|
sh := newCustomShard(b, false, shardOptions{
|
||||||
|
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
||||||
|
})
|
||||||
|
|
||||||
|
defer func() { require.NoError(b, sh.Close()) }()
|
||||||
|
|
||||||
|
var putPrm PutPrm
|
||||||
|
|
||||||
|
for i := 0; i < objectsCount/2; i++ {
|
||||||
|
obj := testutil.GenerateObject()
|
||||||
|
testutil.AddAttribute(obj, "foo", "bar")
|
||||||
|
testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj
|
||||||
|
|
||||||
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < objectsCount/2; i++ {
|
||||||
|
obj := testutil.GenerateObject()
|
||||||
|
testutil.AddAttribute(obj, "foo", "bar")
|
||||||
|
obj.SetID(oidtest.ID())
|
||||||
|
testutil.AddPayload(obj, 1<<20) // fstree obj
|
||||||
|
|
||||||
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(b, sh.Close())
|
||||||
|
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
|
||||||
|
|
||||||
|
require.NoError(b, sh.Open(context.Background()))
|
||||||
|
sh.cfg.refillMetabase = true
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
require.NoError(b, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
require.NoError(b, sh.Close())
|
||||||
|
}
|
|
@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) {
|
||||||
WithMetaBaseOptions(metaOpts...),
|
WithMetaBaseOptions(metaOpts...),
|
||||||
WithPiloramaOptions(
|
WithPiloramaOptions(
|
||||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||||
|
WithMetricsWriter(NewMetricStore()),
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := New(opts...)
|
sh := New(opts...)
|
||||||
|
|
|
@ -90,12 +90,19 @@ type MetricsWriter interface {
|
||||||
IncContainerObjectsCount(cnrID string, objectType string)
|
IncContainerObjectsCount(cnrID string, objectType string)
|
||||||
// SubContainerObjectsCount subtracts container object count.
|
// SubContainerObjectsCount subtracts container object count.
|
||||||
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||||
|
// IncRefillObjectsCount increments refill objects count.
|
||||||
|
IncRefillObjectsCount(path string, size int, success bool)
|
||||||
|
// SetRefillPercent sets refill percent.
|
||||||
|
SetRefillPercent(path string, percent uint32)
|
||||||
|
// SetRefillStatus sets refill status.
|
||||||
|
SetRefillStatus(path string, status string)
|
||||||
}
|
}
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
|
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
|
refillMetabaseWorkersCount int
|
||||||
|
|
||||||
rmBatchSize int
|
rmBatchSize int
|
||||||
|
|
||||||
|
@ -300,6 +307,13 @@ func WithRefillMetabase(v bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
|
||||||
|
func WithRefillMetabaseWorkersCount(v int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.refillMetabaseWorkersCount = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||||
// - mode.ReadWrite;
|
// - mode.ReadWrite;
|
||||||
// - mode.ReadOnly.
|
// - mode.ReadOnly.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -23,6 +24,9 @@ type EngineMetrics interface {
|
||||||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||||
|
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||||
|
SetRefillPercent(shardID, path string, percent uint32)
|
||||||
|
SetRefillStatus(shardID, path, status string)
|
||||||
|
|
||||||
WriteCache() WriteCacheMetrics
|
WriteCache() WriteCacheMetrics
|
||||||
GC() GCMetrics
|
GC() GCMetrics
|
||||||
|
@ -37,6 +41,11 @@ type engineMetrics struct {
|
||||||
mode *shardIDModeValue
|
mode *shardIDModeValue
|
||||||
contObjCounter *prometheus.GaugeVec
|
contObjCounter *prometheus.GaugeVec
|
||||||
|
|
||||||
|
refillStatus *shardIDPathModeValue
|
||||||
|
refillObjCounter *prometheus.GaugeVec
|
||||||
|
refillPayloadCounter *prometheus.GaugeVec
|
||||||
|
refillPercentCounter *prometheus.GaugeVec
|
||||||
|
|
||||||
gc *gcMetrics
|
gc *gcMetrics
|
||||||
writeCache *writeCacheMetrics
|
writeCache *writeCacheMetrics
|
||||||
}
|
}
|
||||||
|
@ -55,10 +64,14 @@ func newEngineMetrics() *engineMetrics {
|
||||||
objectCounter: newEngineGaugeVector("objects_total",
|
objectCounter: newEngineGaugeVector("objects_total",
|
||||||
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
|
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
|
||||||
[]string{shardIDLabel, typeLabel}),
|
[]string{shardIDLabel, typeLabel}),
|
||||||
gc: newGCMetrics(),
|
gc: newGCMetrics(),
|
||||||
writeCache: newWriteCacheMetrics(),
|
writeCache: newWriteCacheMetrics(),
|
||||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||||
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||||
|
refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"),
|
||||||
|
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||||
|
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||||
|
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
||||||
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
|
m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
|
m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
|
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||||
m.mode.Delete(shardID)
|
m.mode.Delete(shardID)
|
||||||
|
m.refillStatus.DeleteByShardID(shardID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||||
|
@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics {
|
||||||
func (m *engineMetrics) GC() GCMetrics {
|
func (m *engineMetrics) GC() GCMetrics {
|
||||||
return m.gc
|
return m.gc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
|
||||||
|
m.refillObjCounter.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
pathLabel: path,
|
||||||
|
successLabel: strconv.FormatBool(success),
|
||||||
|
},
|
||||||
|
).Inc()
|
||||||
|
m.refillPayloadCounter.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
pathLabel: path,
|
||||||
|
successLabel: strconv.FormatBool(success),
|
||||||
|
},
|
||||||
|
).Add(float64(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
|
||||||
|
m.refillPercentCounter.With(prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
pathLabel: path,
|
||||||
|
}).Set(float64(percent))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
|
||||||
|
m.refillStatus.SetMode(shardID, path, status)
|
||||||
|
}
|
||||||
|
|
|
@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *shardIDPathModeValue) DeleteByShardID(shardID string) {
|
||||||
|
m.modeValue.DeletePartialMatch(prometheus.Labels{
|
||||||
|
shardIDLabel: shardID,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func modeFromBool(readOnly bool) string {
|
func modeFromBool(readOnly bool) string {
|
||||||
modeValue := readWriteMode
|
modeValue := readWriteMode
|
||||||
if readOnly {
|
if readOnly {
|
||||||
|
|
Loading…
Reference in a new issue