Blobovnicza tree rebuild (support) #703

Merged
fyrchik merged 18 commits from dstepanov-yadro/frostfs-node:feat/shard_migrator_support_v0.37 into support/v0.37 2024-09-04 19:51:03 +00:00
59 changed files with 2328 additions and 175 deletions

View file

@ -5,7 +5,7 @@ import (
"fmt"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -40,7 +40,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
if id := resStorageID.StorageID(); id != nil {
cmd.Printf("Object storageID: %s\n\n", blobovnicza.NewIDFromBytes(id).String())
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
} else {
cmd.Printf("Object does not contain storageID\n\n")
}

View file

@ -101,6 +101,7 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []shardCfg
lowMem bool
rebuildWorkers uint32
}
}
@ -176,6 +177,8 @@ type subStorageCfg struct {
width uint64
leafWidth uint64
openedCacheSize int
initWorkerCount int
initInAdvance bool
}
// readConfig fills applicationConfiguration with raw configuration values
@ -207,6 +210,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -291,6 +295,8 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.initWorkerCount = sub.InitWorkerCount()
sCfg.initInAdvance = sub.InitInAdvance()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
@ -685,13 +691,14 @@ func initCfgObject(appCfg *config.Config) cfgObject {
}
func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4)
var opts []engine.Option
opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
)
if c.metricsCollector != nil {
@ -780,7 +787,10 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithInitInAdvance(sRead.initInAdvance),
blobovniczatree.WithLogger(c.log),
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
}
if c.metricsCollector != nil {

View file

@ -15,6 +15,9 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
// RebuildWorkersCountDefault is a default value of the workers count to
// process storage rebuild operations in a storage engine.
RebuildWorkersCountDefault = 100
)
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem")
}
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
func EngineRebuildWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
return v
}
return RebuildWorkersCountDefault
}

View file

@ -38,15 +38,17 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
})
const path = "../../../../config/example/node"
var fileConfigTest = func(c *config.Config) {
fileConfigTest := func(c *config.Config) {
num := 0
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
defer func() {
@ -78,7 +80,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 3221225472, wc.SizeLimit())
require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
require.Equal(t, 100, meta.BoltDB().MaxBatchSize())
require.Equal(t, 10*time.Millisecond, meta.BoltDB().MaxBatchDelay())
@ -89,15 +91,17 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, 2, len(ss))
blz := blobovniczaconfig.From((*config.Config)(ss[0]))
require.Equal(t, "tmp/0/blob/blobovnicza", ss[0].Path())
require.EqualValues(t, 0644, blz.BoltDB().Perm())
require.EqualValues(t, 0o644, blz.BoltDB().Perm())
require.EqualValues(t, 4194304, blz.Size())
require.EqualValues(t, 1, blz.ShallowDepth())
require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, 10, blz.InitWorkerCount())
require.EqualValues(t, true, blz.InitInAdvance())
require.Equal(t, "tmp/0/blob", ss[1].Path())
require.EqualValues(t, 0644, ss[1].Perm())
require.EqualValues(t, 0o644, ss[1].Perm())
fst := fstreeconfig.From((*config.Config)(ss[1]))
require.EqualValues(t, 5, fst.Depth())
@ -112,7 +116,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, mode.ReadOnly, sc.Mode())
case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0644), pl.Perm())
require.Equal(t, fs.FileMode(0o644), pl.Perm())
require.True(t, pl.NoSync())
require.Equal(t, 5*time.Millisecond, pl.MaxBatchDelay())
require.Equal(t, 100, pl.MaxBatchSize())
@ -127,7 +131,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4294967296, wc.SizeLimit())
require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm())
require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
require.Equal(t, 200, meta.BoltDB().MaxBatchSize())
require.Equal(t, 20*time.Millisecond, meta.BoltDB().MaxBatchDelay())
@ -144,9 +148,10 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount())
require.Equal(t, "tmp/1/blob", ss[1].Path())
require.EqualValues(t, 0644, ss[1].Perm())
require.EqualValues(t, 0o644, ss[1].Perm())
fst := fstreeconfig.From((*config.Config)(ss[1]))
require.EqualValues(t, 5, fst.Depth())

View file

@ -22,6 +22,9 @@ const (
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
OpenedCacheSizeDefault = 16
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
InitWorkerCountDefault = 5
)
// From wraps config section into Config.
@ -112,3 +115,29 @@ func (x *Config) LeafWidth() uint64 {
"leaf_width",
)
}
// InitWorkersCount returns the value of "init_worker_count" config parameter.
//
// Returns InitWorkerCountDefault if the value is not a positive number.
func (x *Config) InitWorkerCount() int {
d := config.IntSafe(
(*config.Config)(x),
"init_worker_count",
)
if d > 0 {
return int(d)
}
return InitWorkerCountDefault
}
// InitInAdvance returns the value of "init_in_advance" config parameter.
//
// Returns False if the value is not defined or invalid.
func (x *Config) InitInAdvance() bool {
return config.BoolSafe(
(*config.Config)(x),
"init_in_advance",
)
}

View file

@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
## 0 shard
### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
@ -123,6 +124,8 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_IN_ADVANCE=TRUE
### FSTree config
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob

View file

@ -137,6 +137,7 @@
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"rebuild_workers_count": 1000,
"shard": {
"0": {
"mode": "read-only",
@ -170,7 +171,9 @@
"depth": 1,
"width": 4,
"opened_cache_capacity": 50,
"leaf_width": 10
"leaf_width": 10,
"init_worker_count": 10,
"init_in_advance": true
},
{
"type": "fstree",

View file

@ -116,6 +116,7 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
shard:
default: # section with the default shard parameters
@ -182,6 +183,8 @@ storage:
blobstor:
- type: blobovnicza
path: tmp/0/blob/blobovnicza
init_worker_count: 10 #count of workers to initialize blobovniczas
init_in_advance: true
- type: fstree
path: tmp/0/blob # blobstor path

View file

@ -512,4 +512,36 @@ const (
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToRebuildBlobstore = "failed to rebuild blobstore"
BlobstoreRebuildStarted = "blobstore rebuild started"
BlobstoreRebuildCompletedSuccessfully = "blobstore rebuild completed successfully"
BlobstoreRebuildStopped = "blobstore rebuild stopped"
BlobovniczaTreeFixingFileExtensions = "fixing blobovnicza tree file extensions..."
BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully = "fixing blobovnicza tree file extensions completed successfully"
BlobovniczaTreeFixingFileExtensionsFailed = "failed to fix blobovnicza tree file extensions"
BlobovniczaTreeFixingFileExtensionForFile = "fixing blobovnicza file extension..."
BlobovniczaTreeFixingFileExtensionCompletedSuccessfully = "fixing blobovnicza file extension completed successfully"
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..."
BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed"
BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully"
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza = "could not put move info to source blobovnicza"
BlobovniczatreeCouldNotUpdateStorageID = "could not update storage ID"
BlobovniczatreeCouldNotDropMoveInfo = "could not drop move info from source blobovnicza"
BlobovniczatreeCouldNotDeleteFromSource = "could not delete object from source blobovnicza"
BlobovniczaTreeCompletingPreviousRebuild = "completing previous rebuild if failed..."
BlobovniczaTreeCompletedPreviousRebuildSuccess = "previous rebuild completed successfully"
BlobovniczaTreeCompletedPreviousRebuildFailed = "failed to complete previous rebuild"
BlobovniczatreeCouldNotCheckExistenceInSourceDB = "could not check object existence in source blobovnicza"
BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza"
BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza"
BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza"
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
)

View file

@ -104,17 +104,42 @@ func (b *Blobovnicza) Init() error {
func (b *Blobovnicza) initializeCounters() error {
var size uint64
var items uint64
var sizeExists bool
var itemsCountExists bool
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
keysN := uint64(b.Stats().KeyN)
size += keysN * upper
items += keysN
return false, nil
size, sizeExists = hasDataSize(tx)
items, itemsCountExists = hasItemsCount(tx)
if sizeExists && itemsCountExists {
return nil
}
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
return false, b.ForEach(func(k, v []byte) error {
size += uint64(len(k) + len(v))
items++
return nil
})
})
})
if err != nil {
return fmt.Errorf("can't determine DB size: %w", err)
}
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
b.log.Debug(logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
if err := b.boltDB.Update(func(tx *bbolt.Tx) error {
if err := saveDataSize(tx, size); err != nil {
return err
}
return saveItemsCount(tx, items)
}); err != nil {
b.log.Debug(logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
}
b.log.Debug(logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
}
b.dataSize.Store(size)
b.itemsCount.Store(items)
b.metrics.AddOpenBlobovniczaSize(size)

View file

@ -49,9 +49,10 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
var sizeUpperBound uint64
var sizeLowerBound uint64
var dataSize uint64
var recordSize uint64
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
err := b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey)
if objData == nil {
// object is not in bucket => continue iterating
@ -60,9 +61,27 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
dataSize = uint64(len(objData))
sizeLowerBound = lower
sizeUpperBound = upper
recordSize = dataSize + uint64(len(addrKey))
found = true
return true, buck.Delete(addrKey)
})
if err != nil {
return err
}
if found {
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
if count > 0 {
count--
}
if size >= recordSize {
size -= recordSize
} else {
size = 0
}
return count, size
})
}
return nil
})
if err == nil && !found {
@ -74,7 +93,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
zap.String("binary size", stringifyByteSize(dataSize)),
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
)
b.itemDeleted(sizeUpperBound)
b.itemDeleted(recordSize)
}
return DeleteRes{}, err

View file

@ -26,7 +26,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
exists = buck.Get(addrKey) != nil
if exists {
return errInterruptForEach

View file

@ -57,7 +57,11 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
data = buck.Get(addrKey)
if data == nil {
return nil

View file

@ -12,11 +12,11 @@ import (
"go.opentelemetry.io/otel/trace"
)
// iterateAllBuckets iterates all buckets in db
// iterateAllDataBuckets iterates all buckets in db
//
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
// then there may be more buckets than the current limit of the object size.
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key)
if buck == nil {
@ -139,7 +139,10 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
return buck.ForEach(func(k, v []byte) error {
select {
case <-ctx.Done():

View file

@ -0,0 +1,103 @@
package blobovnicza
import (
"bytes"
"encoding/binary"
"go.etcd.io/bbolt"
)
const (
dataSizeAndItemsCountBufLength = 8
)
var (
metaBucketName = []byte("META")
dataSizeKey = []byte("data_size")
itemsCountKey = []byte("items_count")
)
func isNonDataBucket(bucketName []byte) bool {
return bytes.Equal(bucketName, incompletedMoveBucketName) || bytes.Equal(bucketName, metaBucketName)
}
func hasDataSize(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(dataSizeKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func hasItemsCount(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(itemsCountKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func saveDataSize(tx *bbolt.Tx, size uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
return b.Put(dataSizeKey, buf)
}
func saveItemsCount(tx *bbolt.Tx, count uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}
func updateMeta(tx *bbolt.Tx, updateValues func(count, size uint64) (uint64, uint64)) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
var count uint64
var size uint64
v := b.Get(itemsCountKey)
if v != nil {
count = binary.LittleEndian.Uint64(v)
}
v = b.Get(dataSizeKey)
if v != nil {
size = binary.LittleEndian.Uint64(v)
}
count, size = updateValues(count, size)
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
if err := b.Put(dataSizeKey, buf); err != nil {
return err
}
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}

View file

@ -0,0 +1,108 @@
package blobovnicza
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE")
type MoveInfo struct {
Address oid.Address
TargetStorageID []byte
}
func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("target_storage_id", string(prm.TargetStorageID)),
))
defer span.End()
key := addressKey(prm.Address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName)
if err != nil {
return err
}
if err := bucket.Put(key, prm.TargetStorageID); err != nil {
return fmt.Errorf("(%T) failed to save move info: %w", b, err)
}
return nil
})
}
func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", address.EncodeToString()),
))
defer span.End()
key := addressKey(address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
if bucket == nil {
return nil
}
if err := bucket.Delete(key); err != nil {
fyrchik marked this conversation as resolved Outdated

Maybe also delete the bucket if it is empty?

Maybe also delete the bucket if it is empty?

fixed

fixed
return fmt.Errorf("(%T) failed to drop move info: %w", b, err)
}
c := bucket.Cursor()
k, v := c.First()
bucketEmpty := k == nil && v == nil
if bucketEmpty {
return tx.DeleteBucket(incompletedMoveBucketName)
}
return nil
})
}
func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
))
defer span.End()
var result []MoveInfo
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
fyrchik marked this conversation as resolved Outdated

Why not err?

Why not `err`?

fixed

fixed
if bucket == nil {
return nil
}
return bucket.ForEach(func(k, v []byte) error {
var addr oid.Address
storageID := make([]byte, len(v))
if err := addressFromKey(&addr, k); err != nil {
return err
}
copy(storageID, v)
result = append(result, MoveInfo{
Address: addr,
TargetStorageID: storageID,
})
return nil
})
}); err != nil {
return nil, err
}
return result, nil
}

View file

@ -57,8 +57,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
defer span.End()
sz := uint64(len(prm.objData))
bucketName, upperBound := bucketForSize(sz)
bucketName := bucketForSize(sz)
key := addressKey(prm.addr)
recordSize := sz + uint64(len(key))
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
buck := tx.Bucket(bucketName)
@ -74,10 +75,12 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
}
return nil
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
return count + 1, size + recordSize
})
})
if err == nil {
b.itemAdded(upperBound)
b.itemAdded(recordSize)
}
return PutRes{}, err

View file

@ -29,9 +29,8 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
return buf[:ln]
}
func bucketForSize(sz uint64) ([]byte, uint64) {
upperBound := upperPowerOfTwo(sz)
return bucketKeyFromBounds(upperBound), upperBound
func bucketForSize(sz uint64) []byte {
return bucketKeyFromBounds(upperPowerOfTwo(sz))
}
func upperPowerOfTwo(v uint64) uint64 {

View file

@ -34,7 +34,7 @@ func TestSizes(t *testing.T) {
upperBound: 4 * firstBucketBound,
},
} {
key, _ := bucketForSize(item.sz)
key := bucketForSize(item.sz)
require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
}
}

View file

@ -21,8 +21,8 @@ func (db *activeDB) Close() {
db.shDB.Close()
}
func (db *activeDB) Path() string {
return db.shDB.Path()
func (db *activeDB) SystemPath() string {

Why this change?

Why this change?

blobovniczatree uses concept path as internal path inside tree (like 0/0/0.db), but activeDB returns system path (like /storage/blobovnicza/0/0/0.db). So i think SystemPath() is more understandable.

`blobovniczatree` uses concept `path` as internal path inside tree (like `0/0/0.db`), but activeDB returns system path (like `/storage/blobovnicza/0/0/0.db`). So i think `SystemPath()` is more understandable.
return db.shDB.SystemPath()
}
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
@ -154,7 +154,7 @@ func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
var next *sharedDB
for iterCount < m.leafWidth {
path := filepath.Join(lvlPath, u64ToHexString(idx))
path := filepath.Join(lvlPath, u64ToHexStringExt(idx))
shDB := m.dbManager.GetByPath(path)
db, err := shDB.Open() //open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil {
@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
if !ok {
return false, 0
}
return true, u64FromHexString(filepath.Base(db.Path()))
return true, u64FromHexString(filepath.Base(db.SystemPath()))
}
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {

View file

@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -54,15 +56,22 @@ import (
type Blobovniczas struct {
cfg
commondbManager *dbManager
activeDBManager *activeDBManager
dbCache *dbCache
commondbManager *dbManager
activeDBManager *activeDBManager
dbCache *dbCache
deleteProtectedObjects *addressMap
dbFilesGuard *sync.RWMutex
rebuildGuard *sync.RWMutex
}
var _ common.Storage = (*Blobovniczas)(nil)
var errPutFailed = errors.New("could not save the object in any blobovnicza")
const (
dbExtension = ".db"
)
// NewBlobovniczaTree returns new instance of blobovniczas tree.
func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz = new(Blobovniczas)
@ -76,9 +85,12 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz.blzLeafWidth = blz.blzShallowWidth
}
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
blz.deleteProtectedObjects = newAddressMap()
blz.dbFilesGuard = &sync.RWMutex{}
blz.rebuildGuard = &sync.RWMutex{}
return blz
}
@ -94,14 +106,16 @@ func addressHash(addr *oid.Address, path string) uint64 {
return hrw.StringHash(a + path)
}
// converts uint64 to hex string.
func u64ToHexString(ind uint64) string {
return strconv.FormatUint(ind, 16)
}
// converts uint64 hex string to uint64.
func u64ToHexStringExt(ind uint64) string {
return strconv.FormatUint(ind, 16) + dbExtension
}
func u64FromHexString(str string) uint64 {
v, err := strconv.ParseUint(str, 16, 64)
v, err := strconv.ParseUint(strings.TrimSuffix(str, dbExtension), 16, 64)
if err != nil {
panic(fmt.Sprintf("blobovnicza name is not an index %s", str))
}

View file

@ -15,8 +15,9 @@ import (
type dbCache struct {
cacheGuard *sync.RWMutex
cache simplelru.LRUCache[string, *sharedDB]
pathLock *utilSync.KeyLocker[string]
pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
closed bool
nonCached map[string]struct{}
dbManager *dbManager
}
@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache {
cache: cache,
dbManager: dbManager,
pathLock: utilSync.NewKeyLocker[string](),
nonCached: make(map[string]struct{}),
}
}
@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB {
return c.create(path)
}
func (c *dbCache) EvictAndMarkNonCached(path string) {
c.pathLock.Lock(path)

Can we move ordering comments to the field definitions?
inline comments could easily be missed when writing new code.

Can we move ordering comments to the field definitions? inline comments could easily be missed when writing new code.

fixed

fixed
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
c.cache.Remove(path)
c.nonCached[path] = struct{}{}
}
func (c *dbCache) RemoveFromNonCached(path string) {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
delete(c.nonCached, path)
}
func (c *dbCache) getExisted(path string) *sharedDB {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
if !c.closed {
_, isNonCached := c.nonCached[path]
if !isNonCached && !c.closed {
c.cache.Add(path, db)
return true
}

View file

@ -2,15 +2,24 @@ package blobovniczatree
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
// Open opens blobovnicza tree.
func (b *Blobovniczas) Open(readOnly bool) error {
b.readOnly = readOnly
b.metrics.SetMode(readOnly)
b.metrics.SetRebuildStatus(rebuildStatusNotStarted)
b.openManagers()
return nil
}
@ -21,33 +30,95 @@ func (b *Blobovniczas) Open(readOnly bool) error {
func (b *Blobovniczas) Init() error {
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensions)
if err := b.addDBExtensionToDBs(b.rootPath, 0); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionsFailed, zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully)
if b.readOnly {
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
return nil
}
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return true, err
}
defer shBlz.Close()
return b.initializeDBs(context.TODO())
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
err := util.MkdirAllX(b.rootPath, b.perm)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(b.blzInitWorkerCount)
visited := make(map[string]struct{})

Why not map[string]struct{}?

Why not `map[string]struct{}`?

fixed

fixed
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
visited[p] = struct{}{}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
blz, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
moveInfo, err := blz.ListMoveInfo(egCtx)
if err != nil {
return err
}
for _, move := range moveInfo {
b.deleteProtectedObjects.Add(move.Address)
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
if b.createDBInAdvance {
err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found {
return false, nil
}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()

SuccessfullyInitialized, but we only call Open without Init, is this expected?

`SuccessfullyInitialized`, but we only call `Open` without `Init`, is this expected?

shBlz is wrapper over blobovnicza, so shBlz.Open() calls blobovnicza's Open() and Init().

`shBlz` is wrapper over `blobovnicza`, so `shBlz.Open()` calls blobovnicza's `Open()` and `Init()`.
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
}
return eg.Wait()
}
func (b *Blobovniczas) openManagers() {
b.commondbManager.Open() //order important
b.commondbManager.Open() // order important
b.activeDBManager.Open()
b.dbCache.Open()
}
// Close implements common.Storage.
func (b *Blobovniczas) Close() error {
b.dbCache.Close() //order important
b.dbCache.Close() // order important
b.activeDBManager.Close()
b.commondbManager.Close()
@ -64,3 +135,37 @@ func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
return b.commondbManager.GetByPath(p)
}
func (b *Blobovniczas) addDBExtensionToDBs(path string, depth uint64) error {
entries, err := os.ReadDir(path)
if os.IsNotExist(err) && depth == 0 {
return nil
}
for _, entry := range entries {
if entry.IsDir() {
if err := b.addDBExtensionToDBs(filepath.Join(path, entry.Name()), depth+1); err != nil {
return err

I like continue + smaller indentation for else better, but don't insist.

I like `continue` + smaller indentation for `else` better, but don't insist.

fixed

fixed
}
continue
}
if strings.HasSuffix(entry.Name(), dbExtension) {
continue
}
if b.readOnly {
return errFailedToChangeExtensionReadOnly
}
sourcePath := filepath.Join(path, entry.Name())
targetPath := filepath.Join(path, entry.Name()+dbExtension)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionForFile, zap.String("source", sourcePath), zap.String("target", targetPath))
if err := os.Rename(sourcePath, targetPath); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionFailed, zap.String("source", sourcePath), zap.String("target", targetPath), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionCompletedSuccessfully, zap.String("source", sourcePath), zap.String("target", targetPath))
}
return nil
}

View file

@ -0,0 +1,189 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"github.com/stretchr/testify/require"
)
func TestDBExtensionFix(t *testing.T) {
root := t.TempDir()
createTestTree(t, 0, 2, 3, root)
t.Run("adds suffix if not exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
t.Run("not adds second suffix if exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
}
func createTestTree(t *testing.T, currentDepth, depth, width uint64, path string) {
if currentDepth == depth {
var w uint64
for ; w < width; w++ {
dbPath := filepath.Join(path, u64ToHexString(w))
b := blobovnicza.New(blobovnicza.WithPath(dbPath))
require.NoError(t, b.Open())
require.NoError(t, b.Init())
require.NoError(t, b.Close())
}
return
}
var w uint64
for ; w < width; w++ {
createTestTree(t, currentDepth+1, depth, width, filepath.Join(path, u64ToHexString(w)))
}
}
func openAndCloseTestTree(t *testing.T, depth, width uint64, path string) {
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(depth),
WithBlobovniczaShallowWidth(width),
WithRootPath(path),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
require.NoError(t, blz.Close())
}
func validateTestTree(t *testing.T, path string) {
entries, err := os.ReadDir(path)
require.NoError(t, err)
for _, entry := range entries {
if entry.IsDir() {
validateTestTree(t, filepath.Join(path, entry.Name()))
} else {
require.True(t, strings.HasSuffix(entry.Name(), dbExtension))
require.False(t, strings.HasSuffix(strings.TrimSuffix(entry.Name(), dbExtension), dbExtension))
}
}
}
func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
fyrchik marked this conversation as resolved Outdated

Typo With -> Width.

Typo `With -> Width`.

fixed

fixed
t.Parallel()
rootDir := t.TempDir()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
obj35 := blobstortest.NewObject(10 * 1024)
addr35 := objectCore.AddressOf(obj35)
raw, err := obj35.Marshal()
require.NoError(t, err)
pRes35, err := blz.Put(context.Background(), common.PutPrm{
Address: addr35,
Object: obj35,
RawData: raw,
})
require.NoError(t, err)
gRes, err := blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
require.NoError(t, blz.Close())
// change depth and width
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(5),
WithBlobovniczaShallowWidth(2),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
obj52 := blobstortest.NewObject(10 * 1024)
addr52 := objectCore.AddressOf(obj52)
raw, err = obj52.Marshal()
require.NoError(t, err)
pRes52, err := blz.Put(context.Background(), common.PutPrm{
Address: addr52,
Object: obj52,
RawData: raw,
})
require.NoError(t, err)
require.NoError(t, blz.Close())
// change depth and width back
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
StorageID: pRes52.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
require.NoError(t, blz.Close())
}

View file

@ -3,6 +3,7 @@ package blobovniczatree
import (
"context"
"encoding/hex"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -17,6 +18,8 @@ import (
"go.uber.org/zap"
)
var errObjectIsDeleteProtected = errors.New("object is delete protected")
// Delete deletes object from blobovnicza tree.
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
@ -42,12 +45,22 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
return common.DeleteRes{}, common.ErrReadOnly
}
if b.rebuildGuard.TryRLock() {

TryRLock is not recommended, I would like to avoid it if possible. Can we do the same with atomics?

`TryRLock` is not recommended, I would like to avoid it if possible. Can we do the same with atomics?

I don't know how to correct do the same with atomics. Now Delete call can perform concurrently , but can't perform concurrently with Rebuild.
So i believe that this is exactly the case when the use of TryLock is justified.

I don't know how to correct do the same with atomics. Now `Delete` call can perform concurrently , but can't perform concurrently with Rebuild. So i believe that this is exactly the case when the use of TryLock is justified.
defer b.rebuildGuard.RUnlock()
} else {
return common.DeleteRes{}, errRebuildInProgress
}
if b.deleteProtectedObjects.Contains(prm.Address) {
return common.DeleteRes{}, errObjectIsDeleteProtected
}
var bPrm blobovnicza.DeletePrm
bPrm.SetAddress(prm.Address)
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return res, err
@ -62,7 +75,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -35,8 +35,8 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return common.ExistsRes{}, err
@ -50,7 +50,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address)
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err := b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
_, err := b.getObjectFromLevel(ctx, gPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -55,7 +55,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
// An invalid boltdb file is created so that it returns an error when opened
require.NoError(t, os.MkdirAll(filepath.Join(dir, relBadFileDir), os.ModePerm))
require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName), []byte("not a boltdb file content"), 0777))
require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName+".db"), []byte("not a boltdb file content"), 0777))
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: []byte(filepath.Join(relBadFileDir, badFileName))})
require.Error(t, err)

View file

@ -46,8 +46,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
bPrm.SetAddress(prm.Address)
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return res, err
@ -62,7 +62,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
return res, err
}
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -45,8 +45,8 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return common.GetRangeRes{}, err
@ -63,7 +63,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getRangeFromLevel(ctx, prm, p)
if err != nil {
outOfBounds := isErrOutOfRange(err)

View file

@ -1,4 +1,4 @@
package blobovnicza
package blobovniczatree
// ID represents Blobovnicza identifier.
type ID []byte
@ -8,8 +8,8 @@ func NewIDFromBytes(v []byte) *ID {
return (*ID)(&v)
}
func (id ID) String() string {
return string(id)
func (id ID) Path() string {
return string(id) + dbExtension
}
func (id ID) Bytes() []byte {

View file

@ -3,7 +3,9 @@ package blobovniczatree
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
@ -50,7 +52,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
return prm.Handler(common.IterationElement{
Address: elem.Address(),
ObjectData: data,
StorageID: []byte(p),
StorageID: []byte(strings.TrimSuffix(p, dbExtension)),
})
}
return prm.LazyHandler(elem.Address(), func() ([]byte, error) {
@ -67,7 +69,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(ctx, func(p string) (bool, error) {
return b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shBlz := b.getBlobovnicza(p)
blz, err := shBlz.Open()
if err != nil {
@ -84,7 +86,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
})
}
// iterator over the paths of Blobovniczas sorted by weight.
// iterateSortedLeaves iterates over the paths of Blobovniczas sorted by weight.
//
// Uses depth, width and leaf width for iteration.
func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSorted(
ctx,
@ -124,7 +128,9 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
}
indices := indexSlice(levelWidth)
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
if !isLeafLevel {
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
}
exec := uint64(len(curPath)) == execDepth
@ -134,10 +140,16 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, ctx.Err()
default:
}
lastPart := u64ToHexString(indices[i])
if isLeafLevel {
lastPart = u64ToHexStringExt(indices[i])
}
if i == 0 {
curPath = append(curPath, u64ToHexString(indices[i]))
curPath = append(curPath, lastPart)
} else {
curPath[len(curPath)-1] = u64ToHexString(indices[i])
curPath[len(curPath)-1] = lastPart
}
if exec {
@ -156,9 +168,110 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, nil
}
fyrchik marked this conversation as resolved Outdated

Existed or Existing?

`Existed` or `Existing`?

fixed

fixed
// iterator over the paths of Blobovniczas in random order.
func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error {
return b.iterateSortedLeaves(ctx, nil, f)
// iterateExistingDBPaths iterates over the paths of Blobovniczas without any order.
//
// Uses existed blobovnicza files for iteration.
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)

Why not move this locking to the caller?

Why not move this locking to the caller?

Fixed

Fixed
return err
}
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode

Why do we NOT return error in this case? Was it the case in the previous implementation?

Why do we NOT return error in this case? Was it the case in the previous implementation?

DB files and dirs may be missing in case of read only mode.

https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/627b3027456fc87605d9a82490088cd2c767dbe4/pkg/local_object_storage/blobstor/blobovniczatree/control.go#L24 DB files and dirs may be missing in case of read only mode.
return false, nil
}
if err != nil {
return false, err
}
for _, entry := range entries {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if entry.IsDir() {
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
} else {
stop, err := f(filepath.Join(path, entry.Name()))
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {

iterateSortedDBPaths iterates over ALL databases
iterateSortedLeaves iterates over databases according to config

Am I correct?

`iterateSortedDBPaths` iterates over ALL databases `iterateSortedLeaves` iterates over databases according to config Am I correct?

Yes

Yes
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
return err

s/Sorder/Sorted/ ?

Actually, Sorted looks like a nice abbreviation for "sorted order" :)

s/Sorder/Sorted/ ? Actually, `Sorted` looks like a nice abbreviation for "sorted order" :)

fixed

fixed
}
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
var dbIdxs []uint64
var dirIdxs []uint64
for _, entry := range entries {
idx := u64FromHexString(entry.Name())
if entry.IsDir() {

If we iterate during rebuild the object could be iterated twice.
It doesn't seem like a big problem, but IMO worth mentioning somewhere in docs.

If we iterate during rebuild the object could be iterated twice. It doesn't seem like a big problem, but IMO worth mentioning somewhere in docs.
dirIdxs = append(dirIdxs, idx)
} else {
dbIdxs = append(dbIdxs, idx)
}
}
if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs {
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs {
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
// makes slice of uint64 values from 0 to number-1.

View file

@ -0,0 +1,42 @@
package blobovniczatree
import (
"context"
"testing"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestIterateSortedLeavesAndDBPathsAreSame(t *testing.T) {
t.Parallel()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(t.TempDir()),
)
blz.createDBInAdvance = true
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
defer func() {
require.NoError(t, blz.Close())
}()
addr := oidtest.Address()
var leaves []string
var dbPaths []string
blz.iterateSortedLeaves(context.Background(), &addr, func(s string) (bool, error) {
leaves = append(leaves, s)
return false, nil
})
blz.iterateSortedDBPaths(context.Background(), addr, func(s string) (bool, error) {
dbPaths = append(dbPaths, s)
return false, nil
})
require.Equal(t, leaves, dbPaths)
}

View file

@ -1,7 +1,9 @@
package blobovniczatree
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
@ -12,9 +14,13 @@ import (
"go.uber.org/zap"
)
var (
errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
)
// sharedDB is responsible for opening and closing a file of single blobovnicza.
type sharedDB struct {
guard *sync.RWMutex
cond *sync.Cond
blcza *blobovnicza.Blobovnicza
refCount uint32
@ -30,8 +36,9 @@ type sharedDB struct {
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
return &sharedDB{
guard: &sync.RWMutex{},
cond: &sync.Cond{
L: &sync.RWMutex{},
},
options: options,
path: path,
readOnly: readOnly,
@ -47,8 +54,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
return nil, errClosed
}
b.guard.Lock()
defer b.guard.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount > 0 {
b.refCount++
@ -76,11 +83,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
}
func (b *sharedDB) Close() {
b.guard.Lock()
defer b.guard.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount == 0 {
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
b.cond.Broadcast()
return
}
@ -98,32 +106,108 @@ func (b *sharedDB) Close() {
}
b.refCount--
if b.refCount == 1 {

What scenario does this branch corresponds to?

What scenario does this branch corresponds to?

See

func (b *sharedDB) CloseAndRemoveFile() error {
	b.cond.L.Lock()
	if b.refCount > 1 {
		b.cond.Wait()
	}
	defer b.cond.L.Unlock()

This is where the caller waits on condition variable until there is only one reference left.

See ``` func (b *sharedDB) CloseAndRemoveFile() error { b.cond.L.Lock() if b.refCount > 1 { b.cond.Wait() } defer b.cond.L.Unlock() ``` This is where the caller waits on condition variable until there is only one reference left.
b.cond.Broadcast()
}
}
func (b *sharedDB) Path() string {
func (b *sharedDB) CloseAndRemoveFile() error {
b.cond.L.Lock()
if b.refCount > 1 {
b.cond.Wait()
}
defer b.cond.L.Unlock()
if b.refCount == 0 {
return errClosingClosedBlobovnicza
}
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
}
b.refCount = 0
b.blcza = nil
b.openDBCounter.Dec()
return os.Remove(b.path)
}
func (b *sharedDB) SystemPath() string {
return b.path
}
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
databases []*sharedDB
dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
openDBCounter *openDBCounter
closedFlag *atomic.Bool
log *logger.Logger
}
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger) *levelDbManager {
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *levelDbManager {
result := &levelDbManager{
databases: make([]*sharedDB, width),
}
for idx := uint64(0); idx < width; idx++ {
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
databases: make(map[uint64]*sharedDB),
dbMtx: &sync.RWMutex{},
options: options,
path: filepath.Join(rootPath, lvlPath),
readOnly: readOnly,
metrics: metrics,
openDBCounter: openDBCounter,
closedFlag: closedFlag,
log: log,
}
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
}
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx]
}
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
db := m.databases[idx]
if db != nil {
return db
}
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
m.databases[idx] = db
return db
}
func (m *levelDbManager) hasAnyDB() bool {
m.dbMtx.RLock()

Why is it Lock and not RLock?

Why is it `Lock` and not `RLock`?

fixed

fixed
defer m.dbMtx.RUnlock()
return len(m.databases) > 0
}
// dbManager manages the opening and closing of blobovnicza instances.
//
// The blobovnicza opens at the first request, closes after the last request.
@ -133,21 +217,19 @@ type dbManager struct {
closedFlag *atomic.Bool
dbCounter *openDBCounter
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
leafWidth uint64
log *logger.Logger
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{
rootPath: rootPath,
options: options,
readOnly: readOnly,
metrics: metrics,
leafWidth: leafWidth,
levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
@ -163,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB {
return levelManager.GetByIndex(curIndex)
}
func (m *dbManager) CleanResources(path string) {
lvlPath := filepath.Dir(path)
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
delete(m.levelToManager, lvlPath)
}
}
func (m *dbManager) Open() {
m.closedFlag.Store(false)
}
@ -195,7 +288,7 @@ func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
return result
}
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result
return result
}

View file

@ -6,6 +6,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
)
const (
rebuildStatusNotStarted = "not_started"
rebuildStatusRunning = "running"
rebuildStatusCompleted = "completed"
rebuildStatusFailed = "failed"
)
type Metrics interface {
Blobovnicza() blobovnicza.Metrics
@ -14,6 +21,10 @@ type Metrics interface {
SetMode(readOnly bool)
Close()
SetRebuildStatus(status string)
ObjectMoved(d time.Duration)
SetRebuildPercent(value uint32)
Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool)
GetRange(d time.Duration, size int, success, withStorageID bool)
@ -27,6 +38,9 @@ type noopMetrics struct{}
func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(bool) {}
func (m *noopMetrics) Close() {}
func (m *noopMetrics) SetRebuildStatus(string) {}
func (m *noopMetrics) SetRebuildPercent(uint32) {}
func (m *noopMetrics) ObjectMoved(time.Duration) {}
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}

View file

@ -2,6 +2,7 @@ package blobovniczatree
import (
"io/fs"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -10,39 +11,48 @@ import (
)
type cfg struct {
log *logger.Logger
perm fs.FileMode
readOnly bool
rootPath string
openedCacheSize int
blzShallowDepth uint64
blzShallowWidth uint64
blzLeafWidth uint64
compression *compression.Config
blzOpts []blobovnicza.Option
// reportError is the function called when encountering disk errors.
reportError func(string, error)
metrics Metrics
log *logger.Logger
perm fs.FileMode
readOnly bool
rootPath string
openedCacheSize int
blzShallowDepth uint64
blzShallowWidth uint64
blzLeafWidth uint64
compression *compression.Config
blzOpts []blobovnicza.Option
reportError func(string, error) // reportError is the function called when encountering disk errors.
metrics Metrics
waitBeforeDropDB time.Duration
blzInitWorkerCount int
blzMoveBatchSize int
createDBInAdvance bool
}
type Option func(*cfg)
const (
defaultPerm = 0700
defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16
defaultPerm = 0o700
defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5
defaulBlzMoveBatchSize = 10000
)
func initConfig(c *cfg) {
*c = cfg{
log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {},
metrics: &noopMetrics{},
log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {},
metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount,
blzMoveBatchSize: defaulBlzMoveBatchSize,
}
}
@ -106,3 +116,34 @@ func WithMetrics(m Metrics) Option {
c.metrics = m
}
}
func WithWaitBeforeDropDB(t time.Duration) Option {
return func(c *cfg) {
c.waitBeforeDropDB = t
}
}
func WithMoveBatchSize(v int) Option {
return func(c *cfg) {
c.blzMoveBatchSize = v
}
}
// WithInitWorkersCount sets maximum workers count to init blobovnicza tree.
//
// Negative or zero value means no limit.
func WithInitWorkerCount(v int) Option {
if v <= 0 {
v = -1
}
return func(c *cfg) {
c.blzInitWorkerCount = v
}
}
// WithInitInAdvance returns an option to create blobovnicza tree DB's in advance.
func WithInitInAdvance(v bool) Option {
return func(c *cfg) {
c.createDBInAdvance = v
}
}

View file

@ -69,7 +69,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
type putIterator struct {
B *Blobovniczas
ID *blobovnicza.ID
ID *ID
AllFull bool
PutPrm blobovnicza.PutPrm
}
@ -101,15 +101,15 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
zap.String("path", active.Path()),
zap.String("path", active.SystemPath()),
zap.String("error", err.Error()))
}
return false, nil
}
idx := u64FromHexString(filepath.Base(active.Path()))
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
idx := u64FromHexString(filepath.Base(active.SystemPath()))
i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
return true, nil
}

View file

@ -0,0 +1,478 @@
package blobovniczatree
import (
"bytes"
"context"
"errors"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly {
return common.RebuildRes{}, common.ErrReadOnly
}
b.metrics.SetRebuildStatus(rebuildStatusRunning)
b.metrics.SetRebuildPercent(0)
success := true
defer func() {
if success {
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
} else {
b.metrics.SetRebuildStatus(rebuildStatusFailed)
}
}()
b.rebuildGuard.Lock()
defer b.rebuildGuard.Unlock()
var res common.RebuildRes
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
if err != nil {
success = false
}
return res, err
}
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
res.FilesRemoved++
completedDBCount++
b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs)))
}
b.metrics.SetRebuildPercent(100)
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
dbsToMigrate := make(map[string]struct{})
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
dbsToMigrate[s] = struct{}{}
return false, nil
}); err != nil {
return nil, err
}
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
delete(dbsToMigrate, s)
return false, nil
}); err != nil {
return nil, err
}
result := make([]string, 0, len(dbsToMigrate))
for db := range dbsToMigrate {
result = append(result, db)
}
return result, nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return 0, err
}
shDBClosed := false
defer func() {
if shDBClosed {
return
}
shDB.Close()
}()
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
shDBClosed, err = b.dropDB(ctx, path, shDB)
return migratedObjects, err
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm
prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if len(batch) == b.blzMoveBatchSize {
return errBatchFull
}
return nil
})
for {
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
addr := addr
data := data
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
}
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
addr oid.Address, data []byte, metaStore common.MetaStorage) error {
startedAt := time.Now()
defer func() {
b.metrics.ObjectMoved(time.Since(startedAt))
}()
it := &moveIterator{
B: b,
ID: nil,
AllFull: true,
Address: addr,
ObjectData: data,
MetaStore: metaStore,
Source: source,
SourceSysPath: sourcePath,
}
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
return err
} else if it.ID == nil {
if it.AllFull {
return common.ErrNoSpace
}
return errPutFailed
}
return nil
}
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
}
b.dbCache.EvictAndMarkNonCached(path)
defer b.dbCache.RemoveFromNonCached(path)
b.dbFilesGuard.Lock()
defer b.dbFilesGuard.Unlock()
if err := shDb.CloseAndRemoveFile(); err != nil {
return false, err
}
b.commondbManager.CleanResources(path)
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
return true, err
}
return true, nil
}
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
if path == "." {
return nil
}
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if err != nil {
return err
}
if len(entries) > 0 {
return nil
}
if err := os.Remove(sysPath); err != nil {
return err
}
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
var count uint64
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
shDB := b.getBlobovnicza(s)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
incompletedMoves, err := blz.ListMoveInfo(ctx)
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
return true, err
}
count++
}
return false, nil
})
}
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
move blobovnicza.MoveInfo, metaStore common.MetaStorage) error {
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
target, err := targetDB.Open()
if err != nil {
return err
}
defer targetDB.Close()
existsInSource := true
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(move.Address)
gRes, err := source.Get(ctx, gPrm)
if err != nil {
if client.IsErrObjectNotFound(err) {
existsInSource = false
} else {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
}
if !existsInSource { //object was deleted by Rebuild, need to delete move info
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
existsInTarget, err := target.Exists(ctx, move.Address)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
if !existsInTarget {
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(move.Address)
putPrm.SetMarshaledObject(gRes.Object())
_, err = target.Put(ctx, putPrm)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
return err
}
}
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address))
if !client.IsErrObjectNotFound(err) {
return err
}
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(move.Address)
if _, err = source.Delete(ctx, deletePrm); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
return err
}
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
type moveIterator struct {
B *Blobovniczas
ID *ID
AllFull bool
Address oid.Address
ObjectData []byte
MetaStore common.MetaStorage
Source *blobovnicza.Blobovnicza
SourceSysPath string
}
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
}
return false, nil
}
if target == nil {
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
return false, nil
}
defer target.Close()
i.AllFull = false
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
Address: i.Address,
TargetStorageID: targetStorageID.Bytes(),
}); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Add(i.Address)
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(i.Address)
putPrm.SetMarshaledObject(i.ObjectData)
_, err = target.Blobovnicza().Put(ctx, putPrm)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
}
return true, nil
}
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address))
return true, nil
}

Again, if we only store true here, what about map[oid.Address]struct{}?

Again, if we only store `true` here, what about `map[oid.Address]struct{}`?

fixed. It just shortens the code.

fixed. It just shortens the code.
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(i.Address)
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Delete(i.Address)
i.ID = targetStorageID
return true, nil
}
type addressMap struct {
data map[oid.Address]struct{}
guard *sync.RWMutex
}
func newAddressMap() *addressMap {
return &addressMap{
data: make(map[oid.Address]struct{}),
guard: &sync.RWMutex{},
}
}
func (m *addressMap) Add(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
m.data[address] = struct{}{}
}
func (m *addressMap) Delete(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
delete(m.data, address)
}
func (m *addressMap) Contains(address oid.Address) bool {
m.guard.RLock()
defer m.guard.RUnlock()
_, contains := m.data[address]
return contains
}

View file

@ -0,0 +1,195 @@
package blobovniczatree
import (
"bytes"
"context"
"path/filepath"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestRebuildFailover(t *testing.T) {
t.Parallel()
t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved)
t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget)
t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource)
}
func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, false)
}
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
var dPrm common.DeletePrm
dPrm.Address = object.AddressOf(obj)
dPrm.StorageID = []byte("0/0/1")
_, err := b.Delete(context.Background(), dPrm)
require.ErrorIs(t, err, errObjectIsDeleteProtected)
metaStub := &storageIDUpdateStub{
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err := blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(object.AddressOf(obj))
_, err = blz.Get(context.Background(), gPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err = blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
gRes, err := blz.Get(context.Background(), gPrm)
require.NoError(t, err)
require.True(t, len(gRes.Object()) > 0)
if mustUpdateStorageID {
require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)]))
}
require.NoError(t, blz.Close())
}

View file

@ -0,0 +1,145 @@
package blobovniczatree
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestBlobovniczaTreeRebuild(t *testing.T) {
t.Parallel()
t.Run("width increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
})
t.Run("width reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
})
t.Run("depth increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
})
t.Run("depth reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
})
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(sourceWidth),
WithBlobovniczaShallowDepth(sourceDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
eg, egCtx := errgroup.WithContext(context.Background())
storageIDs := make(map[oid.Address][]byte)
storageIDsGuard := &sync.Mutex{}
for i := 0; i < 1000; i++ {
eg.Go(func() error {
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
if err != nil {
return err
}
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(egCtx, prm)
if err != nil {
return err
}
storageIDsGuard.Lock()
storageIDs[prm.Address] = res.StorageID
storageIDsGuard.Unlock()
return nil
})
}
require.NoError(t, eg.Wait())
require.NoError(t, b.Close())
b = NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(targetWidth),
WithBlobovniczaShallowDepth(targetDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.Equal(t, shouldMigrate, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
}
type storageIDUpdateStub struct {
guard *sync.Mutex
storageIDs map[oid.Address][]byte
updatedCount uint64
}
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
s.guard.Lock()
defer s.guard.Unlock()
s.storageIDs[addr] = storageID
s.updatedCount++
return nil
}
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}

View file

@ -0,0 +1,26 @@
package common
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type RebuildRes struct {
ObjectsMoved uint64
FilesRemoved uint64
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
}
type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}

View file

@ -30,4 +30,5 @@ type Storage interface {
Put(context.Context, PutPrm) (PutRes, error)
Delete(context.Context, DeletePrm) (DeleteRes, error)
Iterate(context.Context, IteratePrm) (IterateRes, error)
Rebuild(context.Context, RebuildPrm) (RebuildRes, error)
}

View file

@ -570,3 +570,7 @@ func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
func (t *FSTree) SetParentID(parentID string) {
t.metrics.SetParentID(parentID)
}
func (t *FSTree) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -166,3 +166,7 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
}
return common.IterateRes{}, nil
}
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -0,0 +1,45 @@
package blobstor
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved
if err != nil {
b.log.Error(logs.BlobstorRebuildFailedToRebuildStorages,
zap.String("failed_storage_path", storage.Storage.Path()),
zap.String("failed_storage_type", storage.Storage.Type()),
zap.Error(err))
rErr = err
break
}
}
b.log.Info(logs.BlobstorRebuildRebuildStoragesCompleted,
zap.Bool("success", rErr == nil),
zap.Uint64("total_files_removed", summary.FilesRemoved),
zap.Uint64("total_objects_moved", summary.ObjectsMoved))
return rErr
}

View file

@ -229,3 +229,7 @@ func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.
}
func (s *TestStore) SetParentID(string) {}
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -38,6 +38,7 @@ type StorageEngine struct {
err error
}
evacuateLimiter *evacuationLimiter
rebuildLimiter *rebuildLimiter
}
type shardWrapper struct {
@ -213,13 +214,15 @@ type cfg struct {
shardPoolSize uint32
lowMem bool
rebuildWorkersCount uint32
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
rebuildWorkersCount: 100,
}
}
@ -238,6 +241,7 @@ func New(opts ...Option) *StorageEngine {
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
}
}
@ -275,3 +279,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
c.lowMem = lowMemCons
}
}
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
func WithRebuildWorkersCount(count uint32) Option {
return func(c *cfg) {
c.rebuildWorkersCount = count
}
}

View file

@ -0,0 +1,26 @@
package engine
import "context"
type rebuildLimiter struct {
semaphore chan struct{}
}
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {

We already have errgroup and ants.Pool which allow limiting concurrent routines, why do we create yet another limiter here?

We already have errgroup and ants.Pool which allow limiting concurrent routines, why do we create yet another limiter here?

In general blobovniczatree.Rebuild requires from caller some limiter, but not pool to run tasks.
errgroup or ants.Pool doen't look like something that is suitable for this case: it will be difficult to cancel operations, and rebuilds from different shards should not depend on each other's errors.

In general `blobovniczatree.Rebuild` requires from caller some limiter, but not pool to run tasks. `errgroup` or `ants.Pool` doen't look like something that is suitable for this case: it will be difficult to cancel operations, and rebuilds from different shards should not depend on each other's errors.
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}

View file

@ -110,6 +110,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
)...)
if err := sh.UpdateID(ctx); err != nil {

View file

@ -5,6 +5,7 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -107,7 +108,7 @@ func (db *DB) UpdateStorageID(prm UpdateStorageIDPrm) (res UpdateStorageIDRes, e
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
exists, err := db.exists(tx, prm.addr, currEpoch)
if err == nil && exists || errors.Is(err, ErrObjectIsExpired) {
if err == nil && exists || errors.As(err, new(logicerr.Logical)) {
err = updateStorageID(tx, prm.addr, prm.id)
}

View file

@ -42,6 +42,18 @@ func (m *blobovniczaTreeMetrics) Close() {
m.m.CloseBlobobvnizcaTree(m.shardID, m.path)
}
func (m *blobovniczaTreeMetrics) SetRebuildStatus(status string) {
m.m.BlobovniczaTreeRebuildStatus(m.shardID, m.path, status)
}
func (m *blobovniczaTreeMetrics) SetRebuildPercent(value uint32) {
m.m.BlobovniczaTreeRebuildPercent(m.shardID, m.path, value)
}
func (m *blobovniczaTreeMetrics) ObjectMoved(d time.Duration) {
m.m.BlobovniczaTreeObjectMoved(m.shardID, m.path, d)
}
func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}

View file

@ -162,6 +162,9 @@ func (s *Shard) Init(ctx context.Context) error {
s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
return nil
}
@ -266,6 +269,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
// Close releases all Shard's components.
func (s *Shard) Close() error {
if s.rb != nil {
s.rb.Stop(s.log)
}
components := []interface{ Close() error }{}
if s.pilorama != nil {
@ -310,6 +316,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
unlock := s.lockExclusive()
defer unlock()
s.rb.Stop(s.log)
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}()
ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil {
if errors.Is(err, meta.ErrDegradedMode) {

View file

@ -2,14 +2,11 @@ package shard
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@ -33,8 +30,7 @@ func (p *DeletePrm) SetAddresses(addr ...oid.Address) {
p.addr = append(p.addr, addr...)
}
// Delete removes data from the shard's writeCache, metaBase and
// blobStor.
// Delete removes data from the shard's metaBase and blobStor.
func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.Delete",
trace.WithAttributes(
@ -46,10 +42,10 @@ func (s *Shard) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
s.m.RLock()
defer s.m.RUnlock()
return s.delete(ctx, prm)
return s.delete(ctx, prm, false)
}
func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
func (s *Shard) delete(ctx context.Context, prm DeletePrm, skipFailed bool) (DeleteRes, error) {
if s.info.Mode.ReadOnly() {
return DeleteRes{}, ErrReadOnlyMode
} else if s.info.Mode.NoMetabase() {
@ -64,12 +60,18 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
default:
}
s.deleteObjectFromWriteCacheSafe(ctx, addr)
s.deleteFromBlobstorSafe(ctx, addr)
if err := s.deleteFromBlobstor(ctx, addr); err != nil {
if skipFailed {

Why do we no longer delete from the write-cache? It is not obvious from the commit message.

Why do we no longer delete from the write-cache? It is not obvious from the commit message.

updated commit message with explanation, that objects will be deleted from writecache after flush to blobstore.

updated commit message with explanation, that objects will be deleted from writecache after flush to blobstore.
continue
}
return result, err
}
if err := s.deleteFromMetabase(ctx, addr); err != nil {
return result, err // stop on metabase error ?
if skipFailed {
continue
}
return result, err
}
result.deleted++
}
@ -77,16 +79,7 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) {
return result, nil
}
func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) {
if s.hasWriteCache() {
err := s.writeCache.Delete(ctx, addr)
if err != nil && !client.IsErrObjectNotFound(err) && !errors.Is(err, writecache.ErrReadOnly) {
s.log.Warn(logs.ShardCantDeleteObjectFromWriteCache, zap.Error(err))
}
}
}
func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
func (s *Shard) deleteFromBlobstor(ctx context.Context, addr oid.Address) error {
var sPrm meta.StorageIDPrm
sPrm.SetAddress(addr)
@ -95,6 +88,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
s.log.Debug(logs.StorageIDRetrievalFailure,
zap.Stringer("object", addr),
zap.String("error", err.Error()))
return err
}
storageID := res.StorageID()
@ -108,6 +102,7 @@ func (s *Shard) deleteFromBlobstorSafe(ctx context.Context, addr oid.Address) {
zap.Stringer("object_address", addr),
zap.String("error", err.Error()))
}
return err
}
func (s *Shard) deleteFromMetabase(ctx context.Context, addr oid.Address) error {

View file

@ -52,13 +52,18 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = testGet(t, sh, getPrm, hasWriteCache)
require.NoError(t, err)
_, err = sh.Delete(context.TODO(), delPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 30*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
t.Run("small object", func(t *testing.T) {
@ -78,12 +83,17 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
_, err = sh.Get(context.Background(), getPrm)
require.NoError(t, err)
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
if hasWriteCache {
require.Eventually(t, func() bool {
_, err = sh.Delete(context.Background(), delPrm)
return err == nil
}, 10*time.Second, 100*time.Millisecond)
} else {
_, err = sh.Delete(context.Background(), delPrm)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
_, err = sh.Get(context.Background(), getPrm)
return client.IsErrObjectNotFound(err)
}, time.Second, 50*time.Millisecond)
_, err = sh.Get(context.Background(), getPrm)
require.True(t, client.IsErrObjectNotFound(err))
})
}

View file

@ -297,7 +297,7 @@ func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) {
deletePrm.SetAddresses(buf...)
// delete accumulated objects
res, err := s.delete(ctx, deletePrm)
res, err := s.delete(ctx, deletePrm, true)
result.deleted = res.deleted
result.failedToDelete = uint64(len(buf)) - res.deleted

View file

@ -0,0 +1,13 @@
package shard
import "context"
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type noopRebuildLimiter struct{}
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}

View file

@ -0,0 +1,98 @@
package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
cancel: nil,
limiter: l,
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()

Is it possible to call this concurrently?
Anyway, why do we block here and not return immediately with "already started"?
This could (?) lead to Stop() stopping only the first started instance.

Is it possible to call this concurrently? Anyway, why do we block here and not return immediately with "already started"? This could (?) lead to `Stop()` stopping only the first started instance.

Is it possible to call this concurrently?

Actually no, mutes is just foolproof.

Anyway, why do we block here and not return immediately with "already started"?

I don't know what to do with such error. The idea was just to maintain the correct state.

This could (?) lead to Stop() stopping only the first started instance.

Stop() stores nil to cancel.

> Is it possible to call this concurrently? Actually no, mutes is just foolproof. > Anyway, why do we block here and not return immediately with "already started"? I don't know what to do with such error. The idea was just to maintain the correct state. > This could (?) lead to Stop() stopping only the first started instance. Stop() stores nil to cancel.
defer r.mtx.Unlock()
r.start(ctx, bs, mb, log)
}
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
if r.cancel != nil {
r.stop(log)
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}()
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.stop(log)
}
func (r *rebuilder) stop(log *logger.Logger) {
if r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(prm)
return err
}

View file

@ -38,6 +38,8 @@ type Shard struct {
tsSource TombstoneSource
rb *rebuilder
gcCancel atomic.Value
setModeRequested atomic.Bool
}
@ -121,6 +123,8 @@ type cfg struct {
metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error)
rebuildLimiter RebuildWorkerLimiter
}
func defaultCfg() *cfg {
@ -129,6 +133,7 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
}
}
@ -366,6 +371,14 @@ func WithExpiredCollectorWorkersCount(count int) Option {
}
}
// WithRebuildWorkerLimiter return option to set concurrent
// workers count of storage rebuild operation.
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
return func(c *cfg) {
c.rebuildLimiter = l
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -23,16 +23,23 @@ type BlobobvnizcaMetrics interface {
IncOpenBlobovniczaCount(shardID, path string)
DecOpenBlobovniczaCount(shardID, path string)
BlobovniczaTreeRebuildStatus(shardID, path, status string)
BlobovniczaTreeRebuildPercent(shardID, path string, value uint32)
BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration)
}
type blobovnicza struct {
treeMode *shardIDPathModeValue
treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec
treeOpenSize *prometheus.GaugeVec
treeOpenItems *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec
treeMode *shardIDPathModeValue
treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec
treeOpenSize *prometheus.GaugeVec
treeOpenItems *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec
treeObjectMoveDuration *prometheus.HistogramVec
treeRebuildStatus *shardIDPathModeValue
treeRebuildPercent *prometheus.GaugeVec
}
func newBlobovnicza() *blobovnicza {
@ -75,6 +82,19 @@ func newBlobovnicza() *blobovnicza {
Name: "open_blobovnicza_count",
Help: "Count of opened blobovniczas of Blobovnicza tree",
}, []string{shardIDLabel, pathLabel}),
treeObjectMoveDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "object_move_duration_seconds",
Help: "Accumulated Blobovnicza tree object move duration",
}, []string{shardIDLabel, pathLabel}),
treeRebuildStatus: newShardIDPathMode(blobovniczaTreeSubSystem, "rebuild_status", "Blobovnicza tree rebuild status"),
treeRebuildPercent: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "rebuild_complete_percent",
Help: "Percent of rebuild completeness",
}, []string{shardIDLabel, pathLabel}),
}
}
@ -96,6 +116,15 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
shardIDLabel: shardID,
pathLabel: path,
})
b.treeObjectMoveDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildPercent.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildStatus.SetMode(shardID, path, undefinedStatus)
}
func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
@ -163,3 +192,21 @@ func (b *blobovnicza) SubOpenBlobovniczaItems(shardID, path string, items uint64
pathLabel: path,
}).Sub(float64(items))
}
func (b *blobovnicza) BlobovniczaTreeRebuildStatus(shardID, path, status string) {
b.treeRebuildStatus.SetMode(shardID, path, status)
}
func (b *blobovnicza) BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration) {
b.treeObjectMoveDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Observe(d.Seconds())
}
func (b *blobovnicza) BlobovniczaTreeRebuildPercent(shardID, path string, value uint32) {
b.treeRebuildPercent.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Set(float64(value))
}

View file

@ -44,4 +44,5 @@ const (
failedToDeleteStatus = "failed_to_delete"
deletedStatus = "deleted"
undefinedStatus = "undefined"
)