Speed up metabase refill #1027
|
@ -119,10 +119,11 @@ type shardCfg struct {
|
|||
estimateCompressibility bool
|
||||
estimateCompressibilityThreshold float64
|
||||
|
||||
smallSizeObjectLimit uint64
|
||||
uncompressableContentType []string
|
||||
refillMetabase bool
|
||||
mode shardmode.Mode
|
||||
smallSizeObjectLimit uint64
|
||||
uncompressableContentType []string
|
||||
refillMetabase bool
|
||||
refillMetabaseWorkersCount int
|
||||
mode shardmode.Mode
|
||||
|
||||
metaCfg struct {
|
||||
path string
|
||||
|
@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
|||
var newConfig shardCfg
|
||||
|
||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||
newConfig.mode = oldConfig.Mode()
|
||||
newConfig.compress = oldConfig.Compress()
|
||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||
|
@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
|||
sh.shOpts = []shard.Option{
|
||||
shard.WithLogger(c.log),
|
||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||
shard.WithMode(shCfg.mode),
|
||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||
shard.WithMetaBaseOptions(mbOptions...),
|
||||
|
|
|
@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) {
|
|||
|
||||
require.Equal(t, false, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||
case 1:
|
||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||
|
@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) {
|
|||
|
||||
require.Equal(t, true, sc.RefillMetabase())
|
||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -18,6 +18,7 @@ const (
|
|||
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
||||
SmallSizeLimitDefault = 1 << 20
|
||||
EstimateCompressibilityThresholdDefault = 0.1
|
||||
RefillMetabaseWorkersCountDefault = 500
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
|
@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool {
|
|||
)
|
||||
}
|
||||
|
||||
// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
|
||||
//
|
||||
// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number.
|
||||
func (x *Config) RefillMetabaseWorkersCount() int {
|
||||
v := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"resync_metabase_worker_count",
|
||||
)
|
||||
if v > 0 {
|
||||
return int(v)
|
||||
}
|
||||
return RefillMetabaseWorkersCountDefault
|
||||
}
|
||||
|
||||
// Mode return the value of "mode" config parameter.
|
||||
//
|
||||
// Panics if read the value is not one of predefined
|
||||
|
|
|
@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
|
|||
## 0 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
||||
### Flag to set shard mode
|
||||
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
||||
### Write cache config
|
||||
|
|
|
@ -144,6 +144,7 @@
|
|||
"0": {
|
||||
"mode": "read-only",
|
||||
"resync_metabase": false,
|
||||
"resync_metabase_worker_count": 100,
|
||||
"writecache": {
|
||||
"enabled": false,
|
||||
"no_sync": true,
|
||||
|
|
|
@ -167,6 +167,7 @@ storage:
|
|||
# degraded-read-only
|
||||
# disabled (do not work with the shard, allows to not remove it from the config)
|
||||
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
||||
resync_metabase_worker_count: 100
|
||||
|
||||
writecache:
|
||||
enabled: false
|
||||
|
|
|
@ -179,14 +179,15 @@ Contains configuration for each shard. Keys must be consecutive numbers starting
|
|||
`default` subsection has the same format and specifies defaults for missing values.
|
||||
The following table describes configuration for each shard.
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Parameter | Type | Default value | Description |
|
||||
| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `compress` | `bool` | `false` | Flag to enable compression. |
|
||||
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
||||
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||
|
|
|
@ -579,6 +579,8 @@ const (
|
|||
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
|
||||
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
|
||||
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
|
||||
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
|
||||
BlobstoreFailedToGetFileinfo = "failed to get file info"
|
||||
ECFailedToSendToContainerNode = "failed to send EC object to container node"
|
||||
ECFailedToSaveECPart = "failed to save EC part"
|
||||
)
|
||||
|
|
|
@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error {
|
|||
return b.initializeCounters()
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) ObjectsCount() uint64 {
|
||||
return b.itemsCount.Load()
|
||||
}
|
||||
|
||||
func (b *Blobovnicza) initializeCounters() error {
|
||||
var size uint64
|
||||
var items uint64
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package blobovnicza
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
@ -158,7 +159,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
|
|||
}
|
||||
|
||||
if !prm.withoutData {
|
||||
elem.data = v
|
||||
elem.data = bytes.Clone(v)
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
}
|
||||
|
||||
return prm.handler(elem)
|
||||
|
|
38
pkg/local_object_storage/blobstor/blobovniczatree/count.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
)
|
||||
|
||||
func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var (
|
||||
success bool
|
||||
startedAt = time.Now()
|
||||
)
|
||||
defer func() {
|
||||
b.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
var result uint64
|
||||
err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
|
||||
shDB := b.getBlobovniczaWithoutCaching(p)
|
||||
blz, err := shDB.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
defer shDB.Close()
|
||||
|
||||
result += blz.ObjectsCount()
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
|
@ -24,6 +24,7 @@ type Metrics interface {
|
|||
SetRebuildStatus(status string)
|
||||
ObjectMoved(d time.Duration)
|
||||
SetRebuildPercent(value uint32)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
|
||||
Delete(d time.Duration, success, withStorageID bool)
|
||||
Exists(d time.Duration, success, withStorageID bool)
|
||||
|
@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
|||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
|
||||
return &blobovnicza.NoopMetrics{}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ type Storage interface {
|
|||
|
||||
Type() string
|
||||
Path() string
|
||||
ObjectsCount(ctx context.Context) (uint64, error)
|
||||
|
||||
SetCompressor(cc *compression.Config)
|
||||
Compressor() *compression.Config
|
||||
|
|
|
@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) {
|
|||
return counter, nil
|
||||
}
|
||||
|
||||
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
t.metrics.ObjectsCount(time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount",
|
||||
trace.WithAttributes(
|
||||
attribute.String("path", t.RootPath),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
var result uint64
|
||||
|
||||
err := filepath.WalkDir(t.RootPath,
|
||||
func(_ string, d fs.DirEntry, _ error) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if !d.IsDir() {
|
||||
result++
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
|
||||
}
|
||||
success = true
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Type is fstree storage type used in logs and configuration.
|
||||
const Type = "fstree"
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ type Metrics interface {
|
|||
Put(d time.Duration, size int, success bool)
|
||||
Get(d time.Duration, size int, success bool)
|
||||
GetRange(d time.Duration, size int, success bool)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {}
|
|||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) Get(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
|
|
|
@ -1,5 +1,14 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// DumpInfo returns information about blob stor.
|
||||
func (b *BlobStor) DumpInfo() Info {
|
||||
b.modeMtx.RLock()
|
||||
|
@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info {
|
|||
SubStorages: sub,
|
||||
}
|
||||
}
|
||||
|
||||
// ObjectsCount returns Blobstore's total objects count.
|
||||
func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
var err error
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
b.metrics.ObjectsCount(time.Since(startedAt), err == nil)
|
||||
}()
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount")
|
||||
defer span.End()
|
||||
|
||||
b.modeMtx.RLock()
|
||||
defer b.modeMtx.RUnlock()
|
||||
|
||||
var result atomic.Uint64
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
for i := range b.storage {
|
||||
i := i
|
||||
eg.Go(func() error {
|
||||
v, e := b.storage[i].Storage.ObjectsCount(egCtx)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
result.Add(v)
|
||||
fyrchik
commented
This won't work for blobovniczas:
This becomes worse the more small objects we have. Have you considered using naive but correct implementation? (traverse blobovniczas, maybe even count objects, not their size). This won't work for blobovniczas:
1. They don't reclaim memory and the whole 40mb db could be empty.
2. B-tree has some overhead for meta.
This becomes worse the more small objects we have.
Have you considered using naive but correct implementation? (traverse blobovniczas, maybe even count objects, not their size).
dstepanov-yadro
commented
That's why the method is called That's why the method is called `DiskUsage`, not `DataSize`.
Every blobovnicza stores data size, but to get this value it is required to open/close db. As it is used only for metrics, I think it is enough to calculate only disk size.
fyrchik
commented
But then metrics won't reflect what we deem them reflect (progress of operation). But then metrics won't reflect what we deem them reflect (progress of operation).
dstepanov-yadro
commented
Agree, fixed. Agree, fixed.
Now object count used for metrics, data size is hard to compute if compression is enabled.
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err = eg.Wait(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return result.Load(), nil
|
||||
}
|
||||
|
|
|
@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
|
|||
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
}
|
||||
|
||||
func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return uint64(len(s.objs)), nil
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type Metrics interface {
|
|||
Get(d time.Duration, size int, success, withStorageID bool)
|
||||
Iterate(d time.Duration, success bool)
|
||||
Put(d time.Duration, size int, success bool)
|
||||
ObjectsCount(d time.Duration, success bool)
|
||||
}
|
||||
|
||||
type noopMetrics struct{}
|
||||
|
@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
|
|||
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
|
||||
func (m *noopMetrics) Iterate(time.Duration, bool) {}
|
||||
func (m *noopMetrics) Put(time.Duration, int, bool) {}
|
||||
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
|
||||
|
|
|
@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {}
|
|||
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
}
|
||||
|
||||
func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.st.ObjectsCount(ctx)
|
||||
}
|
||||
|
|
|
@ -27,6 +27,10 @@ type MetricRegister interface {
|
|||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
|
||||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
GC() metrics.GCMetrics
|
||||
}
|
||||
|
|
|
@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string
|
|||
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
|
||||
m.mw.IncRefillObjectsCount(m.id, path, size, success)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
|
||||
m.mw.SetRefillPercent(m.id, path, percent)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) SetRefillStatus(path string, status string) {
|
||||
m.mw.SetRefillStatus(m.id, path, status)
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
||||
type blobovniczaMetrics struct {
|
||||
m metrics_impl.BlobobvnizcaMetrics
|
||||
shardID func() string
|
||||
|
|
|
@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) {
|
|||
m.m.AddPut(m.shardID, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{})
|
||||
}
|
||||
|
|
|
@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) {
|
|||
m.m.AddGet(m.shardID, m.path, size)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) {
|
||||
m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
||||
|
@ -180,44 +182,68 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||
path := s.metaBase.DumpInfo().Path
|
||||
s.metricsWriter.SetRefillStatus(path, "running")
|
||||
s.metricsWriter.SetRefillPercent(path, 0)
|
||||
var success bool
|
||||
defer func() {
|
||||
if success {
|
||||
s.metricsWriter.SetRefillStatus(path, "completed")
|
||||
} else {
|
||||
s.metricsWriter.SetRefillStatus(path, "failed")
|
||||
}
|
||||
}()
|
||||
|
||||
err := s.metaBase.Reset()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not reset metabase: %w", err)
|
||||
}
|
||||
|
||||
obj := objectSDK.New()
|
||||
withCount := true
|
||||
totalObjects, err := s.blobStor.ObjectsCount(ctx)
|
||||
if err != nil {
|
||||
acid-ant
commented
Why not to move timeout in config? Why not to move timeout in config?
dstepanov-yadro
commented
can't imagine the situation when we will change this. can't imagine the situation when we will change this.
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It looks strange to me: we can wait for 10 minutes and still do not receive any metrics. So why don't we allow ourselves hang here? It looks strange to me: we can wait for 10 minutes and still do not receive any metrics.
Ironically situations when disk usage calculation takes too long are also likely the ones where having metrics provides a huge QOL improvement. metabase resync duration has the order of _tens of hours_ for big disks, 10 minute is nothing in comparison.
So why don't we allow ourselves hang here?
dstepanov-yadro
commented
Fixed. Fixed.
|
||||
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
|
||||
withCount = false
|
||||
}
|
||||
|
||||
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("err", err.Error()))
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
if s.cfg.refillMetabaseWorkersCount > 0 {
|
||||
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
|
||||
}
|
||||
|
||||
var completedCount uint64
|
||||
var metricGuard sync.Mutex
|
||||
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||
eg.Go(func() error {
|
||||
var success bool
|
||||
defer func() {
|
||||
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
|
||||
if withCount {
|
||||
metricGuard.Lock()
|
||||
completedCount++
|
||||
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
|
||||
metricGuard.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
|
||||
return err
|
||||
}
|
||||
success = true
|
||||
return nil
|
||||
})
|
||||
|
||||
select {
|
||||
case <-egCtx.Done():
|
||||
return egCtx.Err()
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err = s.refillTombstoneObject(ctx, obj)
|
||||
case objectSDK.TypeLock:
|
||||
err = s.refillLockObject(ctx, obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err = s.metaBase.Put(ctx, mPrm)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
egErr := eg.Wait()
|
||||
|
||||
err = errors.Join(egErr, itErr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
||||
}
|
||||
|
@ -227,6 +253,40 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
|||
return fmt.Errorf("could not sync object counters: %w", err)
|
||||
}
|
||||
|
||||
success = true
|
||||
s.metricsWriter.SetRefillPercent(path, 100)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
|
||||
obj := objectSDK.New()
|
||||
if err := obj.Unmarshal(data); err != nil {
|
||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("err", err.Error()))
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
switch obj.Type() {
|
||||
case objectSDK.TypeTombstone:
|
||||
err = s.refillTombstoneObject(ctx, obj)
|
||||
case objectSDK.TypeLock:
|
||||
err = s.refillLockObject(ctx, obj)
|
||||
default:
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var mPrm meta.PutPrm
|
||||
mPrm.SetObject(obj)
|
||||
mPrm.SetStorageID(descriptor)
|
||||
|
||||
_, err = s.metaBase.Put(ctx, mPrm)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})))
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
|
@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
|
|||
WithBlobStorOptions(blobOpts...),
|
||||
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
|
||||
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
|
||||
WithRefillMetabase(true))
|
||||
WithRefillMetabase(true),
|
||||
WithMetricsWriter(mm))
|
||||
require.NoError(t, sh.Open(context.Background()))
|
||||
require.NoError(t, sh.Init(context.Background()))
|
||||
|
||||
|
@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
WithBlobStorOptions(blobOpts...),
|
||||
|
@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
|
@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
|
||||
WithMetricsWriter(mm),
|
||||
)
|
||||
|
||||
// open Blobstor
|
||||
|
@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) {
|
|||
checkObj(object.AddressOf(tombObj), tombObj)
|
||||
checkTombMembers(true)
|
||||
checkLocked(t, cnrLocked, locked)
|
||||
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
|
||||
require.Equal(t, "completed", mm.refillStatus)
|
||||
require.Equal(t, uint32(100), mm.refillPercent)
|
||||
}
|
||||
|
|
|
@ -21,13 +21,28 @@ import (
|
|||
)
|
||||
|
||||
type metricsStore struct {
|
||||
mtx sync.Mutex
|
||||
objCounters map[string]uint64
|
||||
cnrSize map[string]int64
|
||||
cnrCount map[string]uint64
|
||||
pldSize int64
|
||||
mode mode.Mode
|
||||
errCounter int64
|
||||
mtx sync.Mutex
|
||||
objCounters map[string]uint64
|
||||
cnrSize map[string]int64
|
||||
cnrCount map[string]uint64
|
||||
pldSize int64
|
||||
mode mode.Mode
|
||||
errCounter int64
|
||||
refillCount int64
|
||||
refillSize int64
|
||||
refillPercent uint32
|
||||
refillStatus string
|
||||
}
|
||||
|
||||
func NewMetricStore() *metricsStore {
|
||||
return &metricsStore{
|
||||
objCounters: map[string]uint64{
|
||||
"phy": 0,
|
||||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrCount: make(map[string]uint64),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetShardID(_ string) {}
|
||||
|
@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool
|
|||
return v, ok
|
||||
}
|
||||
|
||||
func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillCount++
|
||||
m.refillSize += int64(size)
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetRefillPercent(_ string, percent uint32) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillPercent = percent
|
||||
}
|
||||
|
||||
func (m *metricsStore) SetRefillStatus(_ string, status string) {
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
m.refillStatus = status
|
||||
}
|
||||
|
||||
func TestCounters(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
|||
}),
|
||||
}
|
||||
|
||||
mm := &metricsStore{
|
||||
objCounters: map[string]uint64{
|
||||
"phy": 0,
|
||||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrCount: make(map[string]uint64),
|
||||
}
|
||||
mm := NewMetricStore()
|
||||
|
||||
sh := New(
|
||||
WithID(NewIDFromBytes([]byte{})),
|
||||
|
|
76
pkg/local_object_storage/shard/refill_test.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func BenchmarkRefillMetabase(b *testing.B) {
|
||||
b.Run("100 objects", func(b *testing.B) {
|
||||
benchRefillMetabase(b, 100)
|
||||
})
|
||||
|
||||
b.Run("1000 objects", func(b *testing.B) {
|
||||
benchRefillMetabase(b, 1000)
|
||||
})
|
||||
|
||||
b.Run("2000 objects", func(b *testing.B) {
|
||||
benchRefillMetabase(b, 2000)
|
||||
})
|
||||
|
||||
b.Run("5000 objects", func(b *testing.B) {
|
||||
benchRefillMetabase(b, 5000)
|
||||
})
|
||||
}
|
||||
|
||||
func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||
sh := newCustomShard(b, false, shardOptions{
|
||||
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
||||
})
|
||||
|
||||
defer func() { require.NoError(b, sh.Close()) }()
|
||||
|
||||
var putPrm PutPrm
|
||||
|
||||
for i := 0; i < objectsCount/2; i++ {
|
||||
obj := testutil.GenerateObject()
|
||||
testutil.AddAttribute(obj, "foo", "bar")
|
||||
testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj
|
||||
|
||||
putPrm.SetObject(obj)
|
||||
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
for i := 0; i < objectsCount/2; i++ {
|
||||
obj := testutil.GenerateObject()
|
||||
testutil.AddAttribute(obj, "foo", "bar")
|
||||
obj.SetID(oidtest.ID())
|
||||
testutil.AddPayload(obj, 1<<20) // fstree obj
|
||||
|
||||
putPrm.SetObject(obj)
|
||||
|
||||
_, err := sh.Put(context.Background(), putPrm)
|
||||
require.NoError(b, err)
|
||||
}
|
||||
|
||||
require.NoError(b, sh.Close())
|
||||
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
|
||||
|
||||
require.NoError(b, sh.Open(context.Background()))
|
||||
sh.cfg.refillMetabase = true
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
require.NoError(b, sh.Init(context.Background()))
|
||||
|
||||
require.NoError(b, sh.Close())
|
||||
}
|
|
@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) {
|
|||
WithMetaBaseOptions(metaOpts...),
|
||||
WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(p, "pilorama"))),
|
||||
WithMetricsWriter(NewMetricStore()),
|
||||
}
|
||||
|
||||
sh := New(opts...)
|
||||
|
|
|
@ -90,12 +90,19 @@ type MetricsWriter interface {
|
|||
IncContainerObjectsCount(cnrID string, objectType string)
|
||||
// SubContainerObjectsCount subtracts container object count.
|
||||
SubContainerObjectsCount(cnrID string, objectType string, value uint64)
|
||||
// IncRefillObjectsCount increments refill objects count.
|
||||
IncRefillObjectsCount(path string, size int, success bool)
|
||||
// SetRefillPercent sets refill percent.
|
||||
SetRefillPercent(path string, percent uint32)
|
||||
// SetRefillStatus sets refill status.
|
||||
SetRefillStatus(path string, status string)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
m sync.RWMutex
|
||||
|
||||
refillMetabase bool
|
||||
refillMetabase bool
|
||||
refillMetabaseWorkersCount int
|
||||
|
||||
rmBatchSize int
|
||||
|
||||
|
@ -300,6 +307,13 @@ func WithRefillMetabase(v bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
|
||||
func WithRefillMetabaseWorkersCount(v int) Option {
|
||||
return func(c *cfg) {
|
||||
c.refillMetabaseWorkersCount = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||
// - mode.ReadWrite;
|
||||
// - mode.ReadOnly.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -23,6 +24,9 @@ type EngineMetrics interface {
|
|||
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncContainerObjectCounter(shardID, contID, objectType string)
|
||||
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
|
||||
IncRefillObjectsCount(shardID, path string, size int, success bool)
|
||||
SetRefillPercent(shardID, path string, percent uint32)
|
||||
SetRefillStatus(shardID, path, status string)
|
||||
|
||||
WriteCache() WriteCacheMetrics
|
||||
GC() GCMetrics
|
||||
|
@ -37,6 +41,11 @@ type engineMetrics struct {
|
|||
mode *shardIDModeValue
|
||||
contObjCounter *prometheus.GaugeVec
|
||||
|
||||
refillStatus *shardIDPathModeValue
|
||||
refillObjCounter *prometheus.GaugeVec
|
||||
refillPayloadCounter *prometheus.GaugeVec
|
||||
refillPercentCounter *prometheus.GaugeVec
|
||||
|
||||
gc *gcMetrics
|
||||
writeCache *writeCacheMetrics
|
||||
}
|
||||
|
@ -55,10 +64,14 @@ func newEngineMetrics() *engineMetrics {
|
|||
objectCounter: newEngineGaugeVector("objects_total",
|
||||
"Objects counters per shards. DEPRECATED: Will be deleted in next releasese, use frostfs_node_engine_container_objects_total metric.",
|
||||
[]string{shardIDLabel, typeLabel}),
|
||||
gc: newGCMetrics(),
|
||||
writeCache: newWriteCacheMetrics(),
|
||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||
gc: newGCMetrics(),
|
||||
writeCache: newWriteCacheMetrics(),
|
||||
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
|
||||
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
|
||||
refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"),
|
||||
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
|
||||
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
|
|||
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
|
||||
m.mode.Delete(shardID)
|
||||
m.refillStatus.DeleteByShardID(shardID)
|
||||
}
|
||||
|
||||
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
|
||||
|
@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics {
|
|||
func (m *engineMetrics) GC() GCMetrics {
|
||||
return m.gc
|
||||
}
|
||||
|
||||
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
|
||||
m.refillObjCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
},
|
||||
).Inc()
|
||||
m.refillPayloadCounter.With(
|
||||
prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
successLabel: strconv.FormatBool(success),
|
||||
},
|
||||
).Add(float64(size))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
|
||||
m.refillPercentCounter.With(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
pathLabel: path,
|
||||
}).Set(float64(percent))
|
||||
}
|
||||
|
||||
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
|
||||
m.refillStatus.SetMode(shardID, path, status)
|
||||
}
|
||||
|
|
|
@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) {
|
|||
})
|
||||
}
|
||||
|
||||
func (m *shardIDPathModeValue) DeleteByShardID(shardID string) {
|
||||
m.modeValue.DeletePartialMatch(prometheus.Labels{
|
||||
shardIDLabel: shardID,
|
||||
})
|
||||
}
|
||||
|
||||
func modeFromBool(readOnly bool) string {
|
||||
modeValue := readWriteMode
|
||||
if readOnly {
|
||||
|
|
v
is only valid fortx
lifetime. So to usev
in separate goroutine it is required to copy it.bytes.Clone()
?bytes.Clone()
doesn't preallocate enough capacity.It does, it may overallocate though, but it is the solution from the stdlib.
Don't mind to leave it as is.
fixed