Speed up metabase refill #1027

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/resync_speedup into master 2024-09-04 19:51:07 +00:00
32 changed files with 506 additions and 60 deletions

View file

@ -122,6 +122,7 @@ type shardCfg struct {
smallSizeObjectLimit uint64 smallSizeObjectLimit uint64
uncompressableContentType []string uncompressableContentType []string
refillMetabase bool refillMetabase bool
refillMetabaseWorkersCount int
mode shardmode.Mode mode shardmode.Mode
metaCfg struct { metaCfg struct {
@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
var newConfig shardCfg var newConfig shardCfg
newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.refillMetabase = oldConfig.RefillMetabase()
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
newConfig.mode = oldConfig.Mode() newConfig.mode = oldConfig.Mode()
newConfig.compress = oldConfig.Compress() newConfig.compress = oldConfig.Compress()
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
sh.shOpts = []shard.Option{ sh.shOpts = []shard.Option{
shard.WithLogger(c.log), shard.WithLogger(c.log),
shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
shard.WithMode(shCfg.mode), shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(blobstoreOpts...), shard.WithBlobStorOptions(blobstoreOpts...),
shard.WithMetaBaseOptions(mbOptions...), shard.WithMetaBaseOptions(mbOptions...),

View file

@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, false, sc.RefillMetabase())
require.Equal(t, mode.ReadOnly, sc.Mode()) require.Equal(t, mode.ReadOnly, sc.Mode())
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
case 1: case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0o644), pl.Perm()) require.Equal(t, fs.FileMode(0o644), pl.Perm())
@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, true, sc.RefillMetabase())
require.Equal(t, mode.ReadWrite, sc.Mode()) require.Equal(t, mode.ReadWrite, sc.Mode())
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
} }
return nil return nil
}) })

View file

@ -18,6 +18,7 @@ const (
// SmallSizeLimitDefault is a default limit of small objects payload in bytes. // SmallSizeLimitDefault is a default limit of small objects payload in bytes.
SmallSizeLimitDefault = 1 << 20 SmallSizeLimitDefault = 1 << 20
EstimateCompressibilityThresholdDefault = 0.1 EstimateCompressibilityThresholdDefault = 0.1
RefillMetabaseWorkersCountDefault = 500
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool {
) )
} }
// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
//
// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number.
func (x *Config) RefillMetabaseWorkersCount() int {
v := config.IntSafe(
(*config.Config)(x),
"resync_metabase_worker_count",
)
if v > 0 {
return int(v)
}
return RefillMetabaseWorkersCountDefault
}
// Mode return the value of "mode" config parameter. // Mode return the value of "mode" config parameter.
// //
// Panics if read the value is not one of predefined // Panics if read the value is not one of predefined

View file

@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
## 0 shard ## 0 shard
### Flag to refill Metabase from BlobStor ### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
### Flag to set shard mode ### Flag to set shard mode
FROSTFS_STORAGE_SHARD_0_MODE=read-only FROSTFS_STORAGE_SHARD_0_MODE=read-only
### Write cache config ### Write cache config

View file

@ -144,6 +144,7 @@
"0": { "0": {
"mode": "read-only", "mode": "read-only",
"resync_metabase": false, "resync_metabase": false,
"resync_metabase_worker_count": 100,
"writecache": { "writecache": {
"enabled": false, "enabled": false,
"no_sync": true, "no_sync": true,

View file

@ -167,6 +167,7 @@ storage:
# degraded-read-only # degraded-read-only
# disabled (do not work with the shard, allows to not remove it from the config) # disabled (do not work with the shard, allows to not remove it from the config)
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
resync_metabase_worker_count: 100
writecache: writecache:
enabled: false enabled: false

View file

@ -180,13 +180,14 @@ Contains configuration for each shard. Keys must be consecutive numbers starting
The following table describes configuration for each shard. The following table describes configuration for each shard.
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `compress` | `bool` | `false` | Flag to enable compression. | | `compress` | `bool` | `false` | Flag to enable compression. |
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). | | `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. | | `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. | | `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |

View file

@ -579,6 +579,8 @@ const (
EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node" EngineShardsEvacuationFailedToMoveTree = "failed to evacuate tree to other node"
EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node" EngineShardsEvacuationTreeEvacuatedLocal = "tree evacuated to local node"
EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node" EngineShardsEvacuationTreeEvacuatedRemote = "tree evacuated to other node"
EngineRefillFailedToGetObjectsCount = "failed to get blobstor objects count, no resync percent estimation is available"
BlobstoreFailedToGetFileinfo = "failed to get file info"
ECFailedToSendToContainerNode = "failed to send EC object to container node" ECFailedToSendToContainerNode = "failed to send EC object to container node"
ECFailedToSaveECPart = "failed to save EC part" ECFailedToSaveECPart = "failed to save EC part"
) )

View file

@ -101,6 +101,10 @@ func (b *Blobovnicza) Init() error {
return b.initializeCounters() return b.initializeCounters()
} }
func (b *Blobovnicza) ObjectsCount() uint64 {
return b.itemsCount.Load()
}
func (b *Blobovnicza) initializeCounters() error { func (b *Blobovnicza) initializeCounters() error {
var size uint64 var size uint64
var items uint64 var items uint64

View file

@ -1,6 +1,7 @@
package blobovnicza package blobovnicza
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"math" "math"
@ -158,7 +159,7 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
} }
if !prm.withoutData { if !prm.withoutData {
elem.data = v elem.data = bytes.Clone(v)
} }
return prm.handler(elem) return prm.handler(elem)

View file

@ -0,0 +1,38 @@
package blobovniczatree
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
)
func (b *Blobovniczas) ObjectsCount(ctx context.Context) (uint64, error) {
var (
success bool
startedAt = time.Now()
)
defer func() {
b.metrics.ObjectsCount(time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "Blobovniczas.ObjectsCount")
defer span.End()
var result uint64
err := b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shDB := b.getBlobovniczaWithoutCaching(p)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
result += blz.ObjectsCount()
return false, nil
})
if err != nil {
return 0, err
}
return result, nil
}

View file

@ -24,6 +24,7 @@ type Metrics interface {
SetRebuildStatus(status string) SetRebuildStatus(status string)
ObjectMoved(d time.Duration) ObjectMoved(d time.Duration)
SetRebuildPercent(value uint32) SetRebuildPercent(value uint32)
ObjectsCount(d time.Duration, success bool)
Delete(d time.Duration, success, withStorageID bool) Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool) Exists(d time.Duration, success, withStorageID bool)
@ -47,6 +48,7 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}
func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics { func (m *noopMetrics) Blobovnicza() blobovnicza.Metrics {
return &blobovnicza.NoopMetrics{} return &blobovnicza.NoopMetrics{}
} }

View file

@ -15,6 +15,7 @@ type Storage interface {
Type() string Type() string
Path() string Path() string
ObjectsCount(ctx context.Context) (uint64, error)
SetCompressor(cc *compression.Config) SetCompressor(cc *compression.Config)
Compressor() *compression.Config Compressor() *compression.Config

View file

@ -467,6 +467,45 @@ func (t *FSTree) countFiles() (uint64, error) {
return counter, nil return counter, nil
} }
func (t *FSTree) ObjectsCount(ctx context.Context) (uint64, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.ObjectsCount(time.Since(startedAt), success)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "FSTree.ObjectsCount",
trace.WithAttributes(
attribute.String("path", t.RootPath),
))
defer span.End()
var result uint64
err := filepath.WalkDir(t.RootPath,
func(_ string, d fs.DirEntry, _ error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if !d.IsDir() {
result++
}
return nil
},
)
if err != nil {
return 0, fmt.Errorf("could not walk through %s directory: %w", t.RootPath, err)
}
success = true
return result, nil
}
// Type is fstree storage type used in logs and configuration. // Type is fstree storage type used in logs and configuration.
const Type = "fstree" const Type = "fstree"

View file

@ -14,6 +14,7 @@ type Metrics interface {
Put(d time.Duration, size int, success bool) Put(d time.Duration, size int, success bool)
Get(d time.Duration, size int, success bool) Get(d time.Duration, size int, success bool)
GetRange(d time.Duration, size int, success bool) GetRange(d time.Duration, size int, success bool)
ObjectsCount(d time.Duration, success bool)
} }
type noopMetrics struct{} type noopMetrics struct{}
@ -27,3 +28,4 @@ func (m *noopMetrics) Exists(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool) {} func (m *noopMetrics) GetRange(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}

View file

@ -1,5 +1,14 @@
package blobstor package blobstor
import (
"context"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"golang.org/x/sync/errgroup"
)
// DumpInfo returns information about blob stor. // DumpInfo returns information about blob stor.
func (b *BlobStor) DumpInfo() Info { func (b *BlobStor) DumpInfo() Info {
b.modeMtx.RLock() b.modeMtx.RLock()
@ -15,3 +24,39 @@ func (b *BlobStor) DumpInfo() Info {
SubStorages: sub, SubStorages: sub,
} }
} }
// ObjectsCount returns Blobstore's total objects count.
func (b *BlobStor) ObjectsCount(ctx context.Context) (uint64, error) {
var err error
startedAt := time.Now()
defer func() {
b.metrics.ObjectsCount(time.Since(startedAt), err == nil)
}()
ctx, span := tracing.StartSpanFromContext(ctx, "BlobStor.ObjectsCount")
defer span.End()
b.modeMtx.RLock()
defer b.modeMtx.RUnlock()
var result atomic.Uint64
eg, egCtx := errgroup.WithContext(ctx)
for i := range b.storage {
i := i
eg.Go(func() error {
v, e := b.storage[i].Storage.ObjectsCount(egCtx)
if e != nil {
return e
}
result.Add(v)
return nil
})
}
if err = eg.Wait(); err != nil {
return 0, err
}
return result.Load(), nil
}

View file

@ -163,3 +163,10 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) { func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil return common.RebuildRes{}, nil
} }
func (s *memstoreImpl) ObjectsCount(_ context.Context) (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return uint64(len(s.objs)), nil
}

View file

@ -13,6 +13,7 @@ type Metrics interface {
Get(d time.Duration, size int, success, withStorageID bool) Get(d time.Duration, size int, success, withStorageID bool)
Iterate(d time.Duration, success bool) Iterate(d time.Duration, success bool)
Put(d time.Duration, size int, success bool) Put(d time.Duration, size int, success bool)
ObjectsCount(d time.Duration, success bool)
} }
type noopMetrics struct{} type noopMetrics struct{}
@ -26,3 +27,4 @@ func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Get(time.Duration, int, bool, bool) {} func (m *noopMetrics) Get(time.Duration, int, bool, bool) {}
func (m *noopMetrics) Iterate(time.Duration, bool) {} func (m *noopMetrics) Iterate(time.Duration, bool) {}
func (m *noopMetrics) Put(time.Duration, int, bool) {} func (m *noopMetrics) Put(time.Duration, int, bool) {}
func (m *noopMetrics) ObjectsCount(time.Duration, bool) {}

View file

@ -233,3 +233,10 @@ func (s *TestStore) SetParentID(string) {}
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) { func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil return common.RebuildRes{}, nil
} }
func (s *TestStore) ObjectsCount(ctx context.Context) (uint64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.st.ObjectsCount(ctx)
}

View file

@ -27,6 +27,10 @@ type MetricRegister interface {
IncContainerObjectCounter(shardID, contID, objectType string) IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64) SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
WriteCache() metrics.WriteCacheMetrics WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics GC() metrics.GCMetrics
} }

View file

@ -85,6 +85,18 @@ func (m *metricsWithID) SubContainerObjectsCount(cnrID string, objectType string
m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value) m.mw.SubContainerObjectCounter(m.id, cnrID, objectType, value)
} }
func (m *metricsWithID) IncRefillObjectsCount(path string, size int, success bool) {
m.mw.IncRefillObjectsCount(m.id, path, size, success)
}
func (m *metricsWithID) SetRefillPercent(path string, percent uint32) {
m.mw.SetRefillPercent(m.id, path, percent)
}
func (m *metricsWithID) SetRefillStatus(path string, status string) {
m.mw.SetRefillStatus(m.id, path, status)
}
// AddShard adds a new shard to the storage engine. // AddShard adds a new shard to the storage engine.
// //
// Returns any error encountered that did not allow adding a shard. // Returns any error encountered that did not allow adding a shard.

View file

@ -87,6 +87,10 @@ func (m *blobovniczaTreeMetrics) Put(d time.Duration, size int, success bool) {
} }
} }
func (m *blobovniczaTreeMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "ObjectsCount", d, success, metrics_impl.NullBool{})
}
type blobovniczaMetrics struct { type blobovniczaMetrics struct {
m metrics_impl.BlobobvnizcaMetrics m metrics_impl.BlobobvnizcaMetrics
shardID func() string shardID func() string

View file

@ -63,3 +63,7 @@ func (m *blobstoreMetrics) Put(d time.Duration, size int, success bool) {
m.m.AddPut(m.shardID, size) m.m.AddPut(m.shardID, size)
} }
} }
func (m *blobstoreMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, "ObjectsCount", d, success, metrics_impl.NullBool{})
}

View file

@ -65,3 +65,7 @@ func (m *fstreeMetrics) GetRange(d time.Duration, size int, success bool) {
m.m.AddGet(m.shardID, m.path, size) m.m.AddGet(m.shardID, m.path, size)
} }
} }
func (m *fstreeMetrics) ObjectsCount(d time.Duration, success bool) {
m.m.MethodDuration(m.shardID, m.path, "ObjectsCount", d, success)
}

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -15,6 +16,7 @@ import (
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
func (s *Shard) handleMetabaseFailure(stage string, err error) error { func (s *Shard) handleMetabaseFailure(stage string, err error) error {
@ -180,14 +182,84 @@ func (s *Shard) Init(ctx context.Context) error {
} }
func (s *Shard) refillMetabase(ctx context.Context) error { func (s *Shard) refillMetabase(ctx context.Context) error {
path := s.metaBase.DumpInfo().Path
s.metricsWriter.SetRefillStatus(path, "running")
s.metricsWriter.SetRefillPercent(path, 0)
var success bool
defer func() {
if success {
s.metricsWriter.SetRefillStatus(path, "completed")
} else {
s.metricsWriter.SetRefillStatus(path, "failed")
}
}()
err := s.metaBase.Reset() err := s.metaBase.Reset()
if err != nil { if err != nil {
return fmt.Errorf("could not reset metabase: %w", err) return fmt.Errorf("could not reset metabase: %w", err)
} }
obj := objectSDK.New() withCount := true
totalObjects, err := s.blobStor.ObjectsCount(ctx)
if err != nil {
s.log.Warn(logs.EngineRefillFailedToGetObjectsCount, zap.Error(err))
withCount = false
}
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { eg, egCtx := errgroup.WithContext(ctx)
if s.cfg.refillMetabaseWorkersCount > 0 {
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
}
var completedCount uint64
var metricGuard sync.Mutex
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
eg.Go(func() error {
var success bool
defer func() {
s.metricsWriter.IncRefillObjectsCount(path, len(data), success)
if withCount {
metricGuard.Lock()
completedCount++
s.metricsWriter.SetRefillPercent(path, uint32(completedCount*100/totalObjects))
metricGuard.Unlock()
}
}()
if err := s.refillObject(egCtx, data, addr, descriptor); err != nil {
return err
}
success = true
return nil
})
select {
case <-egCtx.Done():
return egCtx.Err()
default:
return nil
}
})
egErr := eg.Wait()
err = errors.Join(egErr, itErr)
if err != nil {
return fmt.Errorf("could not put objects to the meta: %w", err)
}
err = s.metaBase.SyncCounters()
if err != nil {
return fmt.Errorf("could not sync object counters: %w", err)
}
success = true
s.metricsWriter.SetRefillPercent(path, 100)
return nil
}
func (s *Shard) refillObject(ctx context.Context, data []byte, addr oid.Address, descriptor []byte) error {
obj := objectSDK.New()
if err := obj.Unmarshal(data); err != nil { if err := obj.Unmarshal(data); err != nil {
s.log.Warn(logs.ShardCouldNotUnmarshalObject, s.log.Warn(logs.ShardCouldNotUnmarshalObject,
zap.Stringer("address", addr), zap.Stringer("address", addr),
@ -215,18 +287,6 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
return err return err
} }
return nil
})
if err != nil {
return fmt.Errorf("could not put objects to the meta: %w", err)
}
err = s.metaBase.SyncCounters()
if err != nil {
return fmt.Errorf("could not sync object counters: %w", err)
}
return nil return nil
} }

View file

@ -126,11 +126,15 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
}), }),
} }
mm := NewMetricStore()
sh := New( sh := New(
WithID(NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...), WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{}))) WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta")), meta.WithEpochState(epochState{})),
WithMetricsWriter(mm),
)
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
@ -157,7 +161,8 @@ func TestRefillMetabaseCorrupted(t *testing.T) {
WithBlobStorOptions(blobOpts...), WithBlobStorOptions(blobOpts...),
WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))), WithPiloramaOptions(pilorama.WithPath(filepath.Join(dir, "pilorama"))),
WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})), WithMetaBaseOptions(meta.WithPath(filepath.Join(dir, "meta_new")), meta.WithEpochState(epochState{})),
WithRefillMetabase(true)) WithRefillMetabase(true),
WithMetricsWriter(mm))
require.NoError(t, sh.Open(context.Background())) require.NoError(t, sh.Open(context.Background()))
require.NoError(t, sh.Init(context.Background())) require.NoError(t, sh.Init(context.Background()))
@ -185,6 +190,8 @@ func TestRefillMetabase(t *testing.T) {
}), }),
} }
mm := NewMetricStore()
sh := New( sh := New(
WithID(NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),
WithBlobStorOptions(blobOpts...), WithBlobStorOptions(blobOpts...),
@ -194,6 +201,7 @@ func TestRefillMetabase(t *testing.T) {
), ),
WithPiloramaOptions( WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))), pilorama.WithPath(filepath.Join(p, "pilorama"))),
WithMetricsWriter(mm),
) )
// open Blobstor // open Blobstor
@ -362,6 +370,7 @@ func TestRefillMetabase(t *testing.T) {
), ),
WithPiloramaOptions( WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama_another"))), pilorama.WithPath(filepath.Join(p, "pilorama_another"))),
WithMetricsWriter(mm),
) )
// open Blobstor // open Blobstor
@ -389,4 +398,7 @@ func TestRefillMetabase(t *testing.T) {
checkObj(object.AddressOf(tombObj), tombObj) checkObj(object.AddressOf(tombObj), tombObj)
checkTombMembers(true) checkTombMembers(true)
checkLocked(t, cnrLocked, locked) checkLocked(t, cnrLocked, locked)
require.Equal(t, int64(len(mObjs)+2), mm.refillCount) // 1 lock + 1 tomb
require.Equal(t, "completed", mm.refillStatus)
require.Equal(t, uint32(100), mm.refillPercent)
} }

View file

@ -28,6 +28,21 @@ type metricsStore struct {
pldSize int64 pldSize int64
mode mode.Mode mode mode.Mode
errCounter int64 errCounter int64
refillCount int64
refillSize int64
refillPercent uint32
refillStatus string
}
func NewMetricStore() *metricsStore {
return &metricsStore{
objCounters: map[string]uint64{
"phy": 0,
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
}
} }
func (m *metricsStore) SetShardID(_ string) {} func (m *metricsStore) SetShardID(_ string) {}
@ -155,6 +170,28 @@ func (m *metricsStore) getContainerCount(cnrID, objectType string) (uint64, bool
return v, ok return v, ok
} }
func (m *metricsStore) IncRefillObjectsCount(_ string, size int, success bool) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillCount++
m.refillSize += int64(size)
}
func (m *metricsStore) SetRefillPercent(_ string, percent uint32) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillPercent = percent
}
func (m *metricsStore) SetRefillStatus(_ string, status string) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.refillStatus = status
}
func TestCounters(t *testing.T) { func TestCounters(t *testing.T) {
t.Parallel() t.Parallel()
@ -361,14 +398,7 @@ func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
}), }),
} }
mm := &metricsStore{ mm := NewMetricStore()
objCounters: map[string]uint64{
"phy": 0,
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrCount: make(map[string]uint64),
}
sh := New( sh := New(
WithID(NewIDFromBytes([]byte{})), WithID(NewIDFromBytes([]byte{})),

View file

@ -0,0 +1,76 @@
package shard
import (
"context"
"os"
"testing"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func BenchmarkRefillMetabase(b *testing.B) {
b.Run("100 objects", func(b *testing.B) {
benchRefillMetabase(b, 100)
})
b.Run("1000 objects", func(b *testing.B) {
benchRefillMetabase(b, 1000)
})
b.Run("2000 objects", func(b *testing.B) {
benchRefillMetabase(b, 2000)
})
b.Run("5000 objects", func(b *testing.B) {
benchRefillMetabase(b, 5000)
})
}
func benchRefillMetabase(b *testing.B, objectsCount int) {
sh := newCustomShard(b, false, shardOptions{
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
})
defer func() { require.NoError(b, sh.Close()) }()
var putPrm PutPrm
for i := 0; i < objectsCount/2; i++ {
obj := testutil.GenerateObject()
testutil.AddAttribute(obj, "foo", "bar")
testutil.AddPayload(obj, 1<<5) // blobvnicza tree obj
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(b, err)
}
for i := 0; i < objectsCount/2; i++ {
obj := testutil.GenerateObject()
testutil.AddAttribute(obj, "foo", "bar")
obj.SetID(oidtest.ID())
testutil.AddPayload(obj, 1<<20) // fstree obj
putPrm.SetObject(obj)
_, err := sh.Put(context.Background(), putPrm)
require.NoError(b, err)
}
require.NoError(b, sh.Close())
require.NoError(b, os.Remove(sh.metaBase.DumpInfo().Path))
require.NoError(b, sh.Open(context.Background()))
sh.cfg.refillMetabase = true
b.ReportAllocs()
b.ResetTimer()
require.NoError(b, sh.Init(context.Background()))
require.NoError(b, sh.Close())
}

View file

@ -51,6 +51,7 @@ func TestShardReload(t *testing.T) {
WithMetaBaseOptions(metaOpts...), WithMetaBaseOptions(metaOpts...),
WithPiloramaOptions( WithPiloramaOptions(
pilorama.WithPath(filepath.Join(p, "pilorama"))), pilorama.WithPath(filepath.Join(p, "pilorama"))),
WithMetricsWriter(NewMetricStore()),
} }
sh := New(opts...) sh := New(opts...)

View file

@ -90,12 +90,19 @@ type MetricsWriter interface {
IncContainerObjectsCount(cnrID string, objectType string) IncContainerObjectsCount(cnrID string, objectType string)
// SubContainerObjectsCount subtracts container object count. // SubContainerObjectsCount subtracts container object count.
SubContainerObjectsCount(cnrID string, objectType string, value uint64) SubContainerObjectsCount(cnrID string, objectType string, value uint64)
// IncRefillObjectsCount increments refill objects count.
IncRefillObjectsCount(path string, size int, success bool)
// SetRefillPercent sets refill percent.
SetRefillPercent(path string, percent uint32)
// SetRefillStatus sets refill status.
SetRefillStatus(path string, status string)
} }
type cfg struct { type cfg struct {
m sync.RWMutex m sync.RWMutex
refillMetabase bool refillMetabase bool
refillMetabaseWorkersCount int
rmBatchSize int rmBatchSize int
@ -300,6 +307,13 @@ func WithRefillMetabase(v bool) Option {
} }
} }
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
func WithRefillMetabaseWorkersCount(v int) Option {
return func(c *cfg) {
c.refillMetabaseWorkersCount = v
}
}
// WithMode returns option to set shard's mode. Mode must be one of the predefined: // WithMode returns option to set shard's mode. Mode must be one of the predefined:
// - mode.ReadWrite; // - mode.ReadWrite;
// - mode.ReadOnly. // - mode.ReadOnly.

View file

@ -1,6 +1,7 @@
package metrics package metrics
import ( import (
"strconv"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -23,6 +24,9 @@ type EngineMetrics interface {
SetContainerObjectCounter(shardID, contID, objectType string, v uint64) SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string) IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64) SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
WriteCache() WriteCacheMetrics WriteCache() WriteCacheMetrics
GC() GCMetrics GC() GCMetrics
@ -37,6 +41,11 @@ type engineMetrics struct {
mode *shardIDModeValue mode *shardIDModeValue
contObjCounter *prometheus.GaugeVec contObjCounter *prometheus.GaugeVec
refillStatus *shardIDPathModeValue
refillObjCounter *prometheus.GaugeVec
refillPayloadCounter *prometheus.GaugeVec
refillPercentCounter *prometheus.GaugeVec
gc *gcMetrics gc *gcMetrics
writeCache *writeCacheMetrics writeCache *writeCacheMetrics
} }
@ -59,6 +68,10 @@ func newEngineMetrics() *engineMetrics {
writeCache: newWriteCacheMetrics(), writeCache: newWriteCacheMetrics(),
mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"), mode: newShardIDMode(engineSubsystem, "mode_info", "Shard mode"),
contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}), contObjCounter: newEngineGaugeVector("container_objects_total", "Count of objects for each container", []string{shardIDLabel, containerIDLabelKey, typeLabel}),
refillStatus: newShardIDPathMode(engineSubsystem, "resync_metabase_status", "Resync from blobstore to metabase status"),
refillObjCounter: newEngineGaugeVector("resync_metabase_objects_total", "Count of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPayloadCounter: newEngineGaugeVector("resync_metabase_objects_size_bytes", "Size of objects resynced from blobstore to metabase", []string{shardIDLabel, pathLabel, successLabel}),
refillPercentCounter: newEngineGaugeVector("resync_metabase_complete_percent", "Percent of resynced from blobstore to metabase completeness", []string{shardIDLabel, pathLabel}),
} }
} }
@ -106,7 +119,11 @@ func (m *engineMetrics) DeleteShardMetrics(shardID string) {
m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID}) m.payloadSize.Delete(prometheus.Labels{shardIDLabel: shardID})
m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.objectCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID}) m.contObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillObjCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillPayloadCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.refillPercentCounter.DeletePartialMatch(prometheus.Labels{shardIDLabel: shardID})
m.mode.Delete(shardID) m.mode.Delete(shardID)
m.refillStatus.DeleteByShardID(shardID)
} }
func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) { func (m *engineMetrics) AddToObjectCounter(shardID, objectType string, delta int) {
@ -168,3 +185,31 @@ func (m *engineMetrics) WriteCache() WriteCacheMetrics {
func (m *engineMetrics) GC() GCMetrics { func (m *engineMetrics) GC() GCMetrics {
return m.gc return m.gc
} }
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
m.refillObjCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
},
).Inc()
m.refillPayloadCounter.With(
prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
successLabel: strconv.FormatBool(success),
},
).Add(float64(size))
}
func (m *engineMetrics) SetRefillPercent(shardID, path string, percent uint32) {
m.refillPercentCounter.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Set(float64(percent))
}
func (m *engineMetrics) SetRefillStatus(shardID, path, status string) {
m.refillStatus.SetMode(shardID, path, status)
}

View file

@ -74,6 +74,12 @@ func (m *shardIDPathModeValue) Delete(shardID, path string) {
}) })
} }
func (m *shardIDPathModeValue) DeleteByShardID(shardID string) {
m.modeValue.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
})
}
func modeFromBool(readOnly bool) string { func modeFromBool(readOnly bool) string {
modeValue := readWriteMode modeValue := readWriteMode
if readOnly { if readOnly {