Blobovnicza tree rebuild #812

Merged
fyrchik merged 17 commits from dstepanov-yadro/frostfs-node:feat/shard_migrator_master into master 2024-09-04 19:51:04 +00:00
55 changed files with 2276 additions and 131 deletions

View file

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal" common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -40,7 +40,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err)) common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
if id := resStorageID.StorageID(); id != nil { if id := resStorageID.StorageID(); id != nil {
cmd.Printf("Object storageID: %s\n\n", blobovnicza.NewIDFromBytes(id).String()) cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
} else { } else {
cmd.Printf("Object does not contain storageID\n\n") cmd.Printf("Object does not contain storageID\n\n")
} }

View file

@ -103,6 +103,7 @@ type applicationConfiguration struct {
shardPoolSize uint32 shardPoolSize uint32
shards []shardCfg shards []shardCfg
lowMem bool lowMem bool
rebuildWorkers uint32
} }
} }
@ -181,6 +182,8 @@ type subStorageCfg struct {
width uint64 width uint64
leafWidth uint64 leafWidth uint64
openedCacheSize int openedCacheSize int
initWorkerCount int
initInAdvance bool
} }
// readConfig fills applicationConfiguration with raw configuration values // readConfig fills applicationConfiguration with raw configuration values
@ -212,6 +215,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
} }
@ -298,6 +302,8 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth() sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth() sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize() sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.initWorkerCount = sub.InitWorkerCount()
sCfg.initInAdvance = sub.InitInAdvance()
case fstree.Type: case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth() sCfg.depth = sub.Depth()
@ -701,13 +707,14 @@ func initCfgObject(appCfg *config.Config) cfgObject {
} }
func (c *cfg) engineOpts() []engine.Option { func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4) var opts []engine.Option
opts = append(opts, opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log), engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
) )
if c.metricsCollector != nil { if c.metricsCollector != nil {
@ -796,7 +803,10 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithInitInAdvance(sRead.initInAdvance),
blobovniczatree.WithLogger(c.log), blobovniczatree.WithLogger(c.log),
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
} }
if c.metricsCollector != nil { if c.metricsCollector != nil {

View file

@ -15,6 +15,9 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to // ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine. // process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20 ShardPoolSizeDefault = 20
// RebuildWorkersCountDefault is a default value of the workers count to
// process storage rebuild operations in a storage engine.
RebuildWorkersCountDefault = 100
) )
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found. // ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool { func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem") return config.BoolSafe(c.Sub(subsection), "low_mem")
} }
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
func EngineRebuildWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
return v
}
return RebuildWorkersCountDefault
}

View file

@ -38,6 +38,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
}) })
const path = "../../../../config/example/node" const path = "../../../../config/example/node"
@ -47,6 +48,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
defer func() { defer func() {
@ -97,6 +99,8 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth()) require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, 10, blz.InitWorkerCount())
require.EqualValues(t, true, blz.InitInAdvance())
require.Equal(t, "tmp/0/blob", ss[1].Path()) require.Equal(t, "tmp/0/blob", ss[1].Path())
require.EqualValues(t, 0o644, ss[1].Perm()) require.EqualValues(t, 0o644, ss[1].Perm())
@ -146,6 +150,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth()) require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount())
require.Equal(t, "tmp/1/blob", ss[1].Path()) require.Equal(t, "tmp/1/blob", ss[1].Path())
require.EqualValues(t, 0o644, ss[1].Perm()) require.EqualValues(t, 0o644, ss[1].Perm())

View file

@ -22,6 +22,9 @@ const (
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's. // OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
OpenedCacheSizeDefault = 16 OpenedCacheSizeDefault = 16
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
InitWorkerCountDefault = 5
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -112,3 +115,29 @@ func (x *Config) LeafWidth() uint64 {
"leaf_width", "leaf_width",
) )
} }
// InitWorkerCount returns the value of "init_worker_count" config parameter.
//
// Returns InitWorkerCountDefault if the value is not a positive number.
func (x *Config) InitWorkerCount() int {
d := config.IntSafe(
(*config.Config)(x),
"init_worker_count",
)
if d > 0 {
return int(d)
}
return InitWorkerCountDefault
}
// InitInAdvance returns the value of "init_in_advance" config parameter.
//
// Returns False if the value is not defined or invalid.
func (x *Config) InitInAdvance() bool {
return config.BoolSafe(
(*config.Config)(x),
"init_in_advance",
)
}

View file

@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
## 0 shard ## 0 shard
### Flag to refill Metabase from BlobStor ### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
@ -125,6 +126,8 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_IN_ADVANCE=TRUE
### FSTree config ### FSTree config
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob

View file

@ -137,6 +137,7 @@
"storage": { "storage": {
"shard_pool_size": 15, "shard_pool_size": 15,
"shard_ro_error_threshold": 100, "shard_ro_error_threshold": 100,
"rebuild_workers_count": 1000,
"shard": { "shard": {
"0": { "0": {
"mode": "read-only", "mode": "read-only",
@ -172,7 +173,9 @@
"depth": 1, "depth": 1,
"width": 4, "width": 4,
"opened_cache_capacity": 50, "opened_cache_capacity": 50,
"leaf_width": 10 "leaf_width": 10,
"init_worker_count": 10,
"init_in_advance": true
}, },
{ {
"type": "fstree", "type": "fstree",

View file

@ -116,6 +116,7 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
shard: shard:
default: # section with the default shard parameters default: # section with the default shard parameters
@ -184,6 +185,8 @@ storage:
blobstor: blobstor:
- type: blobovnicza - type: blobovnicza
path: tmp/0/blob/blobovnicza path: tmp/0/blob/blobovnicza
init_worker_count: 10 #count of workers to initialize blobovniczas
init_in_advance: true
- type: fstree - type: fstree
path: tmp/0/blob # blobstor path path: tmp/0/blob # blobstor path

View file

@ -519,4 +519,36 @@ const (
FailedToCountWritecacheItems = "failed to count writecache items" FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza" AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToGetContainerCounters = "failed to get container counters values" FailedToGetContainerCounters = "failed to get container counters values"
FailedToRebuildBlobstore = "failed to rebuild blobstore"
BlobstoreRebuildStarted = "blobstore rebuild started"
BlobstoreRebuildCompletedSuccessfully = "blobstore rebuild completed successfully"
BlobstoreRebuildStopped = "blobstore rebuild stopped"
BlobovniczaTreeFixingFileExtensions = "fixing blobovnicza tree file extensions..."
BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully = "fixing blobovnicza tree file extensions completed successfully"
BlobovniczaTreeFixingFileExtensionsFailed = "failed to fix blobovnicza tree file extensions"
BlobovniczaTreeFixingFileExtensionForFile = "fixing blobovnicza file extension..."
BlobovniczaTreeFixingFileExtensionCompletedSuccessfully = "fixing blobovnicza file extension completed successfully"
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..."
BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed"
BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully"
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza = "could not put move info to source blobovnicza"
BlobovniczatreeCouldNotUpdateStorageID = "could not update storage ID"
BlobovniczatreeCouldNotDropMoveInfo = "could not drop move info from source blobovnicza"
BlobovniczatreeCouldNotDeleteFromSource = "could not delete object from source blobovnicza"
BlobovniczaTreeCompletingPreviousRebuild = "completing previous rebuild if failed..."
BlobovniczaTreeCompletedPreviousRebuildSuccess = "previous rebuild completed successfully"
BlobovniczaTreeCompletedPreviousRebuildFailed = "failed to complete previous rebuild"
BlobovniczatreeCouldNotCheckExistenceInSourceDB = "could not check object existence in source blobovnicza"
BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza"
BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza"
BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza"
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
) )

View file

@ -104,17 +104,42 @@ func (b *Blobovnicza) Init() error {
func (b *Blobovnicza) initializeCounters() error { func (b *Blobovnicza) initializeCounters() error {
var size uint64 var size uint64
var items uint64 var items uint64
var sizeExists bool
var itemsCountExists bool
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) { size, sizeExists = hasDataSize(tx)
keysN := uint64(b.Stats().KeyN) items, itemsCountExists = hasItemsCount(tx)
size += keysN * upper
items += keysN if sizeExists && itemsCountExists {
return false, nil return nil
}
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
return false, b.ForEach(func(k, v []byte) error {
size += uint64(len(k) + len(v))
items++
return nil
})
}) })
}) })
if err != nil { if err != nil {
return fmt.Errorf("can't determine DB size: %w", err) return fmt.Errorf("can't determine DB size: %w", err)
} }
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
b.log.Debug(logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
if err := b.boltDB.Update(func(tx *bbolt.Tx) error {
if err := saveDataSize(tx, size); err != nil {
return err
}
return saveItemsCount(tx, items)
}); err != nil {
b.log.Debug(logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
}
b.log.Debug(logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
}
b.dataSize.Store(size) b.dataSize.Store(size)
b.itemsCount.Store(items) b.itemsCount.Store(items)
b.metrics.AddOpenBlobovniczaSize(size) b.metrics.AddOpenBlobovniczaSize(size)

View file

@ -49,9 +49,10 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
var sizeUpperBound uint64 var sizeUpperBound uint64
var sizeLowerBound uint64 var sizeLowerBound uint64
var dataSize uint64 var dataSize uint64
var recordSize uint64
err := b.boltDB.Update(func(tx *bbolt.Tx) error { err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) { err := b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey) objData := buck.Get(addrKey)
if objData == nil { if objData == nil {
// object is not in bucket => continue iterating // object is not in bucket => continue iterating
@ -60,9 +61,27 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
dataSize = uint64(len(objData)) dataSize = uint64(len(objData))
sizeLowerBound = lower sizeLowerBound = lower
sizeUpperBound = upper sizeUpperBound = upper
recordSize = dataSize + uint64(len(addrKey))
found = true found = true
return true, buck.Delete(addrKey) return true, buck.Delete(addrKey)
}) })
if err != nil {
return err
}
if found {
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
if count > 0 {
count--
}
if size >= recordSize {
size -= recordSize
} else {
size = 0
}
return count, size
})
}
return nil
}) })
if err == nil && !found { if err == nil && !found {
@ -75,7 +94,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)), zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
) )
b.itemDeleted(sizeUpperBound) b.itemDeleted(recordSize)
} }
return DeleteRes{}, err return DeleteRes{}, err

View file

@ -24,7 +24,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
addrKey := addressKey(addr) addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error { err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
exists = buck.Get(addrKey) != nil exists = buck.Get(addrKey) != nil
if exists { if exists {
return errInterruptForEach return errInterruptForEach

View file

@ -57,7 +57,11 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
) )
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
data = buck.Get(addrKey) data = buck.Get(addrKey)
if data == nil { if data == nil {
return nil return nil

View file

@ -12,11 +12,11 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
// iterateAllBuckets iterates all buckets in db // iterateAllDataBuckets iterates all buckets in db
// //
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value, // If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
// then there may be more buckets than the current limit of the object size. // then there may be more buckets than the current limit of the object size.
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) { return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key) buck := tx.Bucket(key)
if buck == nil { if buck == nil {
@ -138,7 +138,10 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
var elem IterationElement var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
return buck.ForEach(func(k, v []byte) error { return buck.ForEach(func(k, v []byte) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():

View file

@ -0,0 +1,103 @@
package blobovnicza
import (
"bytes"
"encoding/binary"
"go.etcd.io/bbolt"
)
const (
dataSizeAndItemsCountBufLength = 8
)
var (
metaBucketName = []byte("META")
dataSizeKey = []byte("data_size")
itemsCountKey = []byte("items_count")
)
func isNonDataBucket(bucketName []byte) bool {
return bytes.Equal(bucketName, incompletedMoveBucketName) || bytes.Equal(bucketName, metaBucketName)
}
func hasDataSize(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(dataSizeKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func hasItemsCount(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(itemsCountKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func saveDataSize(tx *bbolt.Tx, size uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
return b.Put(dataSizeKey, buf)
}
func saveItemsCount(tx *bbolt.Tx, count uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}
func updateMeta(tx *bbolt.Tx, updateValues func(count, size uint64) (uint64, uint64)) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
var count uint64
var size uint64
v := b.Get(itemsCountKey)
if v != nil {
count = binary.LittleEndian.Uint64(v)
}
v = b.Get(dataSizeKey)
if v != nil {
size = binary.LittleEndian.Uint64(v)
}
count, size = updateValues(count, size)
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
if err := b.Put(dataSizeKey, buf); err != nil {
return err
}
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}

View file

@ -0,0 +1,108 @@
package blobovnicza
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE")
type MoveInfo struct {
Address oid.Address
TargetStorageID []byte
}
func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("target_storage_id", string(prm.TargetStorageID)),
))
defer span.End()
key := addressKey(prm.Address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName)
if err != nil {
return err
}
if err := bucket.Put(key, prm.TargetStorageID); err != nil {
return fmt.Errorf("(%T) failed to save move info: %w", b, err)
}
return nil
})
}
func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", address.EncodeToString()),
))
defer span.End()
key := addressKey(address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
if bucket == nil {
return nil
}
if err := bucket.Delete(key); err != nil {
return fmt.Errorf("(%T) failed to drop move info: %w", b, err)
}
c := bucket.Cursor()
k, v := c.First()
bucketEmpty := k == nil && v == nil
if bucketEmpty {
return tx.DeleteBucket(incompletedMoveBucketName)
}
return nil
})
}
func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
))
defer span.End()
var result []MoveInfo
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
if bucket == nil {
return nil
}
return bucket.ForEach(func(k, v []byte) error {
var addr oid.Address
storageID := make([]byte, len(v))
if err := addressFromKey(&addr, k); err != nil {
return err
}
copy(storageID, v)
result = append(result, MoveInfo{
Address: addr,
TargetStorageID: storageID,
})
return nil
})
}); err != nil {
return nil, err
}
return result, nil
}

View file

@ -56,8 +56,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
defer span.End() defer span.End()
sz := uint64(len(prm.objData)) sz := uint64(len(prm.objData))
bucketName, upperBound := bucketForSize(sz) bucketName := bucketForSize(sz)
key := addressKey(prm.addr) key := addressKey(prm.addr)
recordSize := sz + uint64(len(key))
err := b.boltDB.Batch(func(tx *bbolt.Tx) error { err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
buck := tx.Bucket(bucketName) buck := tx.Bucket(bucketName)
@ -73,10 +74,12 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err) return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
} }
return nil return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
return count + 1, size + recordSize
})
}) })
if err == nil { if err == nil {
b.itemAdded(upperBound) b.itemAdded(recordSize)
} }
return PutRes{}, err return PutRes{}, err

View file

@ -29,9 +29,8 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
return buf[:ln] return buf[:ln]
} }
func bucketForSize(sz uint64) ([]byte, uint64) { func bucketForSize(sz uint64) []byte {
upperBound := upperPowerOfTwo(sz) return bucketKeyFromBounds(upperPowerOfTwo(sz))
return bucketKeyFromBounds(upperBound), upperBound
} }
func upperPowerOfTwo(v uint64) uint64 { func upperPowerOfTwo(v uint64) uint64 {

View file

@ -34,7 +34,7 @@ func TestSizes(t *testing.T) {
upperBound: 4 * firstBucketBound, upperBound: 4 * firstBucketBound,
}, },
} { } {
key, _ := bucketForSize(item.sz) key := bucketForSize(item.sz)
require.Equal(t, bucketKeyFromBounds(item.upperBound), key) require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
} }
} }

View file

@ -21,8 +21,8 @@ func (db *activeDB) Close() {
db.shDB.Close() db.shDB.Close()
} }
func (db *activeDB) Path() string { func (db *activeDB) SystemPath() string {
return db.shDB.Path() return db.shDB.SystemPath()
} }
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put). // activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
@ -154,7 +154,7 @@ func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
var next *sharedDB var next *sharedDB
for iterCount < m.leafWidth { for iterCount < m.leafWidth {
path := filepath.Join(lvlPath, u64ToHexString(idx)) path := filepath.Join(lvlPath, u64ToHexStringExt(idx))
shDB := m.dbManager.GetByPath(path) shDB := m.dbManager.GetByPath(path)
db, err := shDB.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close() db, err := shDB.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil { if err != nil {
@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
if !ok { if !ok {
return false, 0 return false, 0
} }
return true, u64FromHexString(filepath.Base(db.Path())) return true, u64FromHexString(filepath.Base(db.SystemPath()))
} }
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) { func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {

View file

@ -4,6 +4,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"strconv" "strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -54,15 +56,22 @@ import (
type Blobovniczas struct { type Blobovniczas struct {
cfg cfg
commondbManager *dbManager commondbManager *dbManager
activeDBManager *activeDBManager activeDBManager *activeDBManager
dbCache *dbCache dbCache *dbCache
deleteProtectedObjects *addressMap
dbFilesGuard *sync.RWMutex
rebuildGuard *sync.RWMutex
} }
var _ common.Storage = (*Blobovniczas)(nil) var _ common.Storage = (*Blobovniczas)(nil)
var errPutFailed = errors.New("could not save the object in any blobovnicza") var errPutFailed = errors.New("could not save the object in any blobovnicza")
const (
dbExtension = ".db"
)
// NewBlobovniczaTree returns new instance of blobovniczas tree. // NewBlobovniczaTree returns new instance of blobovniczas tree.
func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) { func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz = new(Blobovniczas) blz = new(Blobovniczas)
@ -76,9 +85,12 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz.blzLeafWidth = blz.blzShallowWidth blz.blzLeafWidth = blz.blzShallowWidth
} }
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log) blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth) blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager) blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
blz.deleteProtectedObjects = newAddressMap()
blz.dbFilesGuard = &sync.RWMutex{}
blz.rebuildGuard = &sync.RWMutex{}
return blz return blz
} }
@ -94,14 +106,16 @@ func addressHash(addr *oid.Address, path string) uint64 {
return hrw.StringHash(a + path) return hrw.StringHash(a + path)
} }
// converts uint64 to hex string.
func u64ToHexString(ind uint64) string { func u64ToHexString(ind uint64) string {
return strconv.FormatUint(ind, 16) return strconv.FormatUint(ind, 16)
} }
// converts uint64 hex string to uint64. func u64ToHexStringExt(ind uint64) string {
return strconv.FormatUint(ind, 16) + dbExtension
}
func u64FromHexString(str string) uint64 { func u64FromHexString(str string) uint64 {
v, err := strconv.ParseUint(str, 16, 64) v, err := strconv.ParseUint(strings.TrimSuffix(str, dbExtension), 16, 64)
if err != nil { if err != nil {
panic(fmt.Sprintf("blobovnicza name is not an index %s", str)) panic(fmt.Sprintf("blobovnicza name is not an index %s", str))
} }

View file

@ -15,8 +15,9 @@ import (
type dbCache struct { type dbCache struct {
cacheGuard *sync.RWMutex cacheGuard *sync.RWMutex
cache simplelru.LRUCache[string, *sharedDB] cache simplelru.LRUCache[string, *sharedDB]
pathLock *utilSync.KeyLocker[string] pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
closed bool closed bool
nonCached map[string]struct{}
dbManager *dbManager dbManager *dbManager
} }
@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache {
cache: cache, cache: cache,
dbManager: dbManager, dbManager: dbManager,
pathLock: utilSync.NewKeyLocker[string](), pathLock: utilSync.NewKeyLocker[string](),
nonCached: make(map[string]struct{}),
} }
} }
@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB {
return c.create(path) return c.create(path)
} }
func (c *dbCache) EvictAndMarkNonCached(path string) {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
c.cache.Remove(path)
c.nonCached[path] = struct{}{}
}
func (c *dbCache) RemoveFromNonCached(path string) {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
delete(c.nonCached, path)
}
func (c *dbCache) getExisted(path string) *sharedDB { func (c *dbCache) getExisted(path string) *sharedDB {
c.cacheGuard.Lock() c.cacheGuard.Lock()
defer c.cacheGuard.Unlock() defer c.cacheGuard.Unlock()
@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
c.cacheGuard.Lock() c.cacheGuard.Lock()
defer c.cacheGuard.Unlock() defer c.cacheGuard.Unlock()
if !c.closed { _, isNonCached := c.nonCached[path]
if !isNonCached && !c.closed {
c.cache.Add(path, db) c.cache.Add(path, db)
return true return true
} }

View file

@ -2,15 +2,24 @@ package blobovniczatree
import ( import (
"context" "context"
"errors"
"os"
"path/filepath"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
// Open opens blobovnicza tree. // Open opens blobovnicza tree.
func (b *Blobovniczas) Open(readOnly bool) error { func (b *Blobovniczas) Open(readOnly bool) error {
b.readOnly = readOnly b.readOnly = readOnly
b.metrics.SetMode(readOnly) b.metrics.SetMode(readOnly)
b.metrics.SetRebuildStatus(rebuildStatusNotStarted)
b.openManagers() b.openManagers()
return nil return nil
} }
@ -21,22 +30,84 @@ func (b *Blobovniczas) Open(readOnly bool) error {
func (b *Blobovniczas) Init() error { func (b *Blobovniczas) Init() error {
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas) b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensions)
if err := b.addDBExtensionToDBs(b.rootPath, 0); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionsFailed, zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully)
if b.readOnly { if b.readOnly {
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization) b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
return nil return nil
} }
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) { return b.initializeDBs(context.TODO())
shBlz := b.getBlobovniczaWithoutCaching(p) }
_, err := shBlz.Open()
if err != nil {
return true, err
}
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
err := util.MkdirAllX(b.rootPath, b.perm)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(b.blzInitWorkerCount)
visited := make(map[string]struct{})
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
visited[p] = struct{}{}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
blz, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
moveInfo, err := blz.ListMoveInfo(egCtx)
if err != nil {
return err
}
for _, move := range moveInfo {
b.deleteProtectedObjects.Add(move.Address)
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil return false, nil
}) })
if err != nil {
_ = eg.Wait()
return err
}
if b.createDBInAdvance {
err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found {
return false, nil
}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
}
return eg.Wait()
} }
func (b *Blobovniczas) openManagers() { func (b *Blobovniczas) openManagers() {
@ -64,3 +135,37 @@ func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB { func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
return b.commondbManager.GetByPath(p) return b.commondbManager.GetByPath(p)
} }
func (b *Blobovniczas) addDBExtensionToDBs(path string, depth uint64) error {
entries, err := os.ReadDir(path)
if os.IsNotExist(err) && depth == 0 {
return nil
}
for _, entry := range entries {
if entry.IsDir() {
if err := b.addDBExtensionToDBs(filepath.Join(path, entry.Name()), depth+1); err != nil {
return err
}
continue
}
if strings.HasSuffix(entry.Name(), dbExtension) {
continue
}
if b.readOnly {
return errFailedToChangeExtensionReadOnly
}
sourcePath := filepath.Join(path, entry.Name())
targetPath := filepath.Join(path, entry.Name()+dbExtension)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionForFile, zap.String("source", sourcePath), zap.String("target", targetPath))
if err := os.Rename(sourcePath, targetPath); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionFailed, zap.String("source", sourcePath), zap.String("target", targetPath), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionCompletedSuccessfully, zap.String("source", sourcePath), zap.String("target", targetPath))
}
return nil
}

View file

@ -0,0 +1,189 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"github.com/stretchr/testify/require"
)
func TestDBExtensionFix(t *testing.T) {
root := t.TempDir()
createTestTree(t, 0, 2, 3, root)
t.Run("adds suffix if not exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
t.Run("not adds second suffix if exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
}
func createTestTree(t *testing.T, currentDepth, depth, width uint64, path string) {
if currentDepth == depth {
var w uint64
for ; w < width; w++ {
dbPath := filepath.Join(path, u64ToHexString(w))
b := blobovnicza.New(blobovnicza.WithPath(dbPath))
require.NoError(t, b.Open())
require.NoError(t, b.Init())
require.NoError(t, b.Close())
}
return
}
var w uint64
for ; w < width; w++ {
createTestTree(t, currentDepth+1, depth, width, filepath.Join(path, u64ToHexString(w)))
}
}
func openAndCloseTestTree(t *testing.T, depth, width uint64, path string) {
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(depth),
WithBlobovniczaShallowWidth(width),
WithRootPath(path),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
require.NoError(t, blz.Close())
}
func validateTestTree(t *testing.T, path string) {
entries, err := os.ReadDir(path)
require.NoError(t, err)
for _, entry := range entries {
if entry.IsDir() {
validateTestTree(t, filepath.Join(path, entry.Name()))
} else {
require.True(t, strings.HasSuffix(entry.Name(), dbExtension))
require.False(t, strings.HasSuffix(strings.TrimSuffix(entry.Name(), dbExtension), dbExtension))
}
}
}
func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
t.Parallel()
rootDir := t.TempDir()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
obj35 := blobstortest.NewObject(10 * 1024)
addr35 := objectCore.AddressOf(obj35)
raw, err := obj35.Marshal()
require.NoError(t, err)
pRes35, err := blz.Put(context.Background(), common.PutPrm{
Address: addr35,
Object: obj35,
RawData: raw,
})
require.NoError(t, err)
gRes, err := blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
require.NoError(t, blz.Close())
// change depth and width
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(5),
WithBlobovniczaShallowWidth(2),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
obj52 := blobstortest.NewObject(10 * 1024)
addr52 := objectCore.AddressOf(obj52)
raw, err = obj52.Marshal()
require.NoError(t, err)
pRes52, err := blz.Put(context.Background(), common.PutPrm{
Address: addr52,
Object: obj52,
RawData: raw,
})
require.NoError(t, err)
require.NoError(t, blz.Close())
// change depth and width back
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
StorageID: pRes52.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
require.NoError(t, blz.Close())
}

View file

@ -3,6 +3,7 @@ package blobovniczatree
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"errors"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -18,6 +19,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errObjectIsDeleteProtected = errors.New("object is delete protected")
// Delete deletes object from blobovnicza tree. // Delete deletes object from blobovnicza tree.
// //
// If blobocvnicza ID is specified, only this blobovnicza is processed. // If blobocvnicza ID is specified, only this blobovnicza is processed.
@ -43,12 +46,22 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
return common.DeleteRes{}, common.ErrReadOnly return common.DeleteRes{}, common.ErrReadOnly
} }
if b.rebuildGuard.TryRLock() {
defer b.rebuildGuard.RUnlock()
} else {
return common.DeleteRes{}, errRebuildInProgress
}
if b.deleteProtectedObjects.Contains(prm.Address) {
return common.DeleteRes{}, errObjectIsDeleteProtected
}
var bPrm blobovnicza.DeletePrm var bPrm blobovnicza.DeletePrm
bPrm.SetAddress(prm.Address) bPrm.SetAddress(prm.Address)
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
return res, err return res, err
@ -63,7 +76,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
objectFound := false objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.deleteObjectFromLevel(ctx, bPrm, p) res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {

View file

@ -36,8 +36,8 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
defer span.End() defer span.End()
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.ExistsRes{}, err return common.ExistsRes{}, err
@ -51,7 +51,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
var gPrm blobovnicza.GetPrm var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address) gPrm.SetAddress(prm.Address)
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err := b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
_, err := b.getObjectFromLevel(ctx, gPrm, p) _, err := b.getObjectFromLevel(ctx, gPrm, p)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {

View file

@ -55,7 +55,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
// An invalid boltdb file is created so that it returns an error when opened // An invalid boltdb file is created so that it returns an error when opened
require.NoError(t, os.MkdirAll(filepath.Join(dir, relBadFileDir), os.ModePerm)) require.NoError(t, os.MkdirAll(filepath.Join(dir, relBadFileDir), os.ModePerm))
require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName), []byte("not a boltdb file content"), 0o777)) require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName+".db"), []byte("not a boltdb file content"), 0o777))
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: []byte(filepath.Join(relBadFileDir, badFileName))}) res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: []byte(filepath.Join(relBadFileDir, badFileName))})
require.Error(t, err) require.Error(t, err)

View file

@ -47,8 +47,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
bPrm.SetAddress(prm.Address) bPrm.SetAddress(prm.Address)
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
return res, err return res, err
@ -63,7 +63,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
return res, err return res, err
} }
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getObjectFromLevel(ctx, bPrm, p) res, err = b.getObjectFromLevel(ctx, bPrm, p)
if err != nil { if err != nil {
if !client.IsErrObjectNotFound(err) { if !client.IsErrObjectNotFound(err) {

View file

@ -46,8 +46,8 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
defer span.End() defer span.End()
if prm.StorageID != nil { if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID) id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String()) shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
return common.GetRangeRes{}, err return common.GetRangeRes{}, err
@ -64,7 +64,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
objectFound := false objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getRangeFromLevel(ctx, prm, p) res, err = b.getRangeFromLevel(ctx, prm, p)
if err != nil { if err != nil {
outOfBounds := isErrOutOfRange(err) outOfBounds := isErrOutOfRange(err)

View file

@ -1,4 +1,4 @@
package blobovnicza package blobovniczatree
// ID represents Blobovnicza identifier. // ID represents Blobovnicza identifier.
type ID []byte type ID []byte
@ -8,8 +8,8 @@ func NewIDFromBytes(v []byte) *ID {
return (*ID)(&v) return (*ID)(&v)
} }
func (id ID) String() string { func (id ID) Path() string {
return string(id) return string(id) + dbExtension
} }
func (id ID) Bytes() []byte { func (id ID) Bytes() []byte {

View file

@ -3,7 +3,9 @@ package blobovniczatree
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"strings"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -54,7 +56,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
return prm.Handler(common.IterationElement{ return prm.Handler(common.IterationElement{
Address: elem.Address(), Address: elem.Address(),
ObjectData: data, ObjectData: data,
StorageID: []byte(p), StorageID: []byte(strings.TrimSuffix(p, dbExtension)),
}) })
} }
return nil return nil
@ -69,7 +71,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
// iterator over all Blobovniczas in unsorted order. Break on f's error return. // iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(ctx, func(p string) (bool, error) { return b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shBlz := b.getBlobovnicza(p) shBlz := b.getBlobovnicza(p)
blz, err := shBlz.Open() blz, err := shBlz.Open()
if err != nil { if err != nil {
@ -90,7 +92,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
}) })
} }
// iterator over the paths of Blobovniczas sorted by weight. // iterateSortedLeaves iterates over the paths of Blobovniczas sorted by weight.
//
// Uses depth, width and leaf width for iteration.
func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error { func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSorted( _, err := b.iterateSorted(
ctx, ctx,
@ -130,7 +134,9 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
} }
indices := indexSlice(levelWidth) indices := indexSlice(levelWidth)
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...))) if !isLeafLevel {
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
}
exec := uint64(len(curPath)) == execDepth exec := uint64(len(curPath)) == execDepth
@ -140,10 +146,16 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, ctx.Err() return false, ctx.Err()
default: default:
} }
lastPart := u64ToHexString(indices[i])
if isLeafLevel {
lastPart = u64ToHexStringExt(indices[i])
}
if i == 0 { if i == 0 {
curPath = append(curPath, u64ToHexString(indices[i])) curPath = append(curPath, lastPart)
} else { } else {
curPath[len(curPath)-1] = u64ToHexString(indices[i]) curPath[len(curPath)-1] = lastPart
} }
if exec { if exec {
@ -162,9 +174,110 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, nil return false, nil
} }
// iterator over the paths of Blobovniczas in random order. // iterateExistingDBPaths iterates over the paths of Blobovniczas without any order.
func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error { //
return b.iterateSortedLeaves(ctx, nil, f) // Uses existed blobovnicza files for iteration.
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
return err
}
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
for _, entry := range entries {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if entry.IsDir() {
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
} else {
stop, err := f(filepath.Join(path, entry.Name()))
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
return err
}
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
var dbIdxs []uint64
var dirIdxs []uint64
for _, entry := range entries {
idx := u64FromHexString(entry.Name())
if entry.IsDir() {
dirIdxs = append(dirIdxs, idx)
} else {
dbIdxs = append(dbIdxs, idx)
}
}
if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs {
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs {
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
} }
// makes slice of uint64 values from 0 to number-1. // makes slice of uint64 values from 0 to number-1.

View file

@ -0,0 +1,42 @@
package blobovniczatree
import (
"context"
"testing"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestIterateSortedLeavesAndDBPathsAreSame(t *testing.T) {
t.Parallel()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(t.TempDir()),
)
blz.createDBInAdvance = true
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
defer func() {
require.NoError(t, blz.Close())
}()
addr := oidtest.Address()
var leaves []string
var dbPaths []string
blz.iterateSortedLeaves(context.Background(), &addr, func(s string) (bool, error) {
leaves = append(leaves, s)
return false, nil
})
blz.iterateSortedDBPaths(context.Background(), addr, func(s string) (bool, error) {
dbPaths = append(dbPaths, s)
return false, nil
})
require.Equal(t, leaves, dbPaths)
}

View file

@ -1,7 +1,9 @@
package blobovniczatree package blobovniczatree
import ( import (
"errors"
"fmt" "fmt"
"os"
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -12,9 +14,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
// sharedDB is responsible for opening and closing a file of single blobovnicza. // sharedDB is responsible for opening and closing a file of single blobovnicza.
type sharedDB struct { type sharedDB struct {
guard *sync.RWMutex cond *sync.Cond
blcza *blobovnicza.Blobovnicza blcza *blobovnicza.Blobovnicza
refCount uint32 refCount uint32
@ -31,8 +35,9 @@ func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *sharedDB { ) *sharedDB {
return &sharedDB{ return &sharedDB{
guard: &sync.RWMutex{}, cond: &sync.Cond{
L: &sync.RWMutex{},
},
options: options, options: options,
path: path, path: path,
readOnly: readOnly, readOnly: readOnly,
@ -48,8 +53,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
return nil, errClosed return nil, errClosed
} }
b.guard.Lock() b.cond.L.Lock()
defer b.guard.Unlock() defer b.cond.L.Unlock()
if b.refCount > 0 { if b.refCount > 0 {
b.refCount++ b.refCount++
@ -77,11 +82,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
} }
func (b *sharedDB) Close() { func (b *sharedDB) Close() {
b.guard.Lock() b.cond.L.Lock()
defer b.guard.Unlock() defer b.cond.L.Unlock()
if b.refCount == 0 { if b.refCount == 0 {
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path)) b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
b.cond.Broadcast()
return return
} }
@ -99,33 +105,109 @@ func (b *sharedDB) Close() {
} }
b.refCount-- b.refCount--
if b.refCount == 1 {
b.cond.Broadcast()
}
} }
func (b *sharedDB) Path() string { func (b *sharedDB) CloseAndRemoveFile() error {
b.cond.L.Lock()
if b.refCount > 1 {
b.cond.Wait()
}
defer b.cond.L.Unlock()
if b.refCount == 0 {
return errClosingClosedBlobovnicza
}
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
}
b.refCount = 0
b.blcza = nil
b.openDBCounter.Dec()
return os.Remove(b.path)
}
func (b *sharedDB) SystemPath() string {
return b.path return b.path
} }
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree. // levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct { type levelDbManager struct {
databases []*sharedDB dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
openDBCounter *openDBCounter
closedFlag *atomic.Bool
log *logger.Logger
} }
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string, func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger, readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *levelDbManager { ) *levelDbManager {
result := &levelDbManager{ result := &levelDbManager{
databases: make([]*sharedDB, width), databases: make(map[uint64]*sharedDB),
} dbMtx: &sync.RWMutex{},
for idx := uint64(0); idx < width; idx++ {
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log) options: options,
path: filepath.Join(rootPath, lvlPath),
readOnly: readOnly,
metrics: metrics,
openDBCounter: openDBCounter,
closedFlag: closedFlag,
log: log,
} }
return result return result
} }
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB { func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
}
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx] return m.databases[idx]
} }
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
db := m.databases[idx]
if db != nil {
return db
}
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
m.databases[idx] = db
return db
}
func (m *levelDbManager) hasAnyDB() bool {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return len(m.databases) > 0
}
// dbManager manages the opening and closing of blobovnicza instances. // dbManager manages the opening and closing of blobovnicza instances.
// //
// The blobovnicza opens at the first request, closes after the last request. // The blobovnicza opens at the first request, closes after the last request.
@ -135,21 +217,19 @@ type dbManager struct {
closedFlag *atomic.Bool closedFlag *atomic.Bool
dbCounter *openDBCounter dbCounter *openDBCounter
rootPath string rootPath string
options []blobovnicza.Option options []blobovnicza.Option
readOnly bool readOnly bool
metrics blobovnicza.Metrics metrics blobovnicza.Metrics
leafWidth uint64 log *logger.Logger
log *logger.Logger
} }
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager { func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{ return &dbManager{
rootPath: rootPath, rootPath: rootPath,
options: options, options: options,
readOnly: readOnly, readOnly: readOnly,
metrics: metrics, metrics: metrics,
leafWidth: leafWidth,
levelToManager: make(map[string]*levelDbManager), levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{}, levelToManagerGuard: &sync.RWMutex{},
log: log, log: log,
@ -165,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB {
return levelManager.GetByIndex(curIndex) return levelManager.GetByIndex(curIndex)
} }
func (m *dbManager) CleanResources(path string) {
lvlPath := filepath.Dir(path)
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
delete(m.levelToManager, lvlPath)
}
}
func (m *dbManager) Open() { func (m *dbManager) Open() {
m.closedFlag.Store(false) m.closedFlag.Store(false)
} }
@ -197,7 +288,7 @@ func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
return result return result
} }
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log) result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result m.levelToManager[lvlPath] = result
return result return result
} }

View file

@ -6,6 +6,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
) )
const (
rebuildStatusNotStarted = "not_started"
rebuildStatusRunning = "running"
rebuildStatusCompleted = "completed"
rebuildStatusFailed = "failed"
)
type Metrics interface { type Metrics interface {
Blobovnicza() blobovnicza.Metrics Blobovnicza() blobovnicza.Metrics
@ -14,6 +21,10 @@ type Metrics interface {
SetMode(readOnly bool) SetMode(readOnly bool)
Close() Close()
SetRebuildStatus(status string)
ObjectMoved(d time.Duration)
SetRebuildPercent(value uint32)
Delete(d time.Duration, success, withStorageID bool) Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool) Exists(d time.Duration, success, withStorageID bool)
GetRange(d time.Duration, size int, success, withStorageID bool) GetRange(d time.Duration, size int, success, withStorageID bool)
@ -27,6 +38,9 @@ type noopMetrics struct{}
func (m *noopMetrics) SetParentID(string) {} func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(bool) {} func (m *noopMetrics) SetMode(bool) {}
func (m *noopMetrics) Close() {} func (m *noopMetrics) Close() {}
func (m *noopMetrics) SetRebuildStatus(string) {}
func (m *noopMetrics) SetRebuildPercent(uint32) {}
func (m *noopMetrics) ObjectMoved(time.Duration) {}
func (m *noopMetrics) Delete(time.Duration, bool, bool) {} func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool, bool) {} func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {} func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}

View file

@ -2,6 +2,7 @@ package blobovniczatree
import ( import (
"io/fs" "io/fs"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -10,39 +11,48 @@ import (
) )
type cfg struct { type cfg struct {
log *logger.Logger log *logger.Logger
perm fs.FileMode perm fs.FileMode
readOnly bool readOnly bool
rootPath string rootPath string
openedCacheSize int openedCacheSize int
blzShallowDepth uint64 blzShallowDepth uint64
blzShallowWidth uint64 blzShallowWidth uint64
blzLeafWidth uint64 blzLeafWidth uint64
compression *compression.Config compression *compression.Config
blzOpts []blobovnicza.Option blzOpts []blobovnicza.Option
// reportError is the function called when encountering disk errors. reportError func(string, error) // reportError is the function called when encountering disk errors.
reportError func(string, error) metrics Metrics
metrics Metrics waitBeforeDropDB time.Duration
blzInitWorkerCount int
blzMoveBatchSize int
createDBInAdvance bool
} }
type Option func(*cfg) type Option func(*cfg)
const ( const (
defaultPerm = 0o700 defaultPerm = 0o700
defaultOpenedCacheSize = 50 defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2 defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16 defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5
defaulBlzMoveBatchSize = 10000
) )
func initConfig(c *cfg) { func initConfig(c *cfg) {
*c = cfg{ *c = cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm, perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize, openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth, blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth, blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {}, reportError: func(string, error) {},
metrics: &noopMetrics{}, metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount,
blzMoveBatchSize: defaulBlzMoveBatchSize,
} }
} }
@ -106,3 +116,34 @@ func WithMetrics(m Metrics) Option {
c.metrics = m c.metrics = m
} }
} }
func WithWaitBeforeDropDB(t time.Duration) Option {
return func(c *cfg) {
c.waitBeforeDropDB = t
}
}
func WithMoveBatchSize(v int) Option {
return func(c *cfg) {
c.blzMoveBatchSize = v
}
}
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
//
// Negative or zero value means no limit.
func WithInitWorkerCount(v int) Option {
if v <= 0 {
v = -1
}
return func(c *cfg) {
c.blzInitWorkerCount = v
}
}
// WithInitInAdvance returns an option to create blobovnicza tree DB's in advance.
func WithInitInAdvance(v bool) Option {
return func(c *cfg) {
c.createDBInAdvance = v
}
}

View file

@ -70,7 +70,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
type putIterator struct { type putIterator struct {
B *Blobovniczas B *Blobovniczas
ID *blobovnicza.ID ID *ID
AllFull bool AllFull bool
PutPrm blobovnicza.PutPrm PutPrm blobovnicza.PutPrm
} }
@ -104,7 +104,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else { } else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
zap.String("path", active.Path()), zap.String("path", active.SystemPath()),
zap.String("error", err.Error()), zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
} }
@ -112,8 +112,8 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
return false, nil return false, nil
} }
idx := u64FromHexString(filepath.Base(active.Path())) idx := u64FromHexString(filepath.Base(active.SystemPath()))
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx)))) i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
return true, nil return true, nil
} }

View file

@ -0,0 +1,478 @@
package blobovniczatree
import (
"bytes"
"context"
"errors"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly {
return common.RebuildRes{}, common.ErrReadOnly
}
b.metrics.SetRebuildStatus(rebuildStatusRunning)
b.metrics.SetRebuildPercent(0)
success := true
defer func() {
if success {
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
} else {
b.metrics.SetRebuildStatus(rebuildStatusFailed)
}
}()
b.rebuildGuard.Lock()
defer b.rebuildGuard.Unlock()
var res common.RebuildRes
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
if err != nil {
success = false
}
return res, err
}
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
res.FilesRemoved++
completedDBCount++
b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs)))
}
b.metrics.SetRebuildPercent(100)
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
dbsToMigrate := make(map[string]struct{})
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
dbsToMigrate[s] = struct{}{}
return false, nil
}); err != nil {
return nil, err
}
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
delete(dbsToMigrate, s)
return false, nil
}); err != nil {
return nil, err
}
result := make([]string, 0, len(dbsToMigrate))
for db := range dbsToMigrate {
result = append(result, db)
}
return result, nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return 0, err
}
shDBClosed := false
defer func() {
if shDBClosed {
return
}
shDB.Close()
}()
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
shDBClosed, err = b.dropDB(ctx, path, shDB)
return migratedObjects, err
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm
prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if len(batch) == b.blzMoveBatchSize {
return errBatchFull
}
return nil
})
for {
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
addr := addr
data := data
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
}
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
addr oid.Address, data []byte, metaStore common.MetaStorage,
) error {
startedAt := time.Now()
defer func() {
b.metrics.ObjectMoved(time.Since(startedAt))
}()
it := &moveIterator{
B: b,
ID: nil,
AllFull: true,
Address: addr,
ObjectData: data,
MetaStore: metaStore,
Source: source,
SourceSysPath: sourcePath,
}
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
return err
} else if it.ID == nil {
if it.AllFull {
return common.ErrNoSpace
}
return errPutFailed
}
return nil
}
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
}
b.dbCache.EvictAndMarkNonCached(path)
defer b.dbCache.RemoveFromNonCached(path)
b.dbFilesGuard.Lock()
defer b.dbFilesGuard.Unlock()
if err := shDb.CloseAndRemoveFile(); err != nil {
return false, err
}
b.commondbManager.CleanResources(path)
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
return true, err
}
return true, nil
}
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
if path == "." {
return nil
}
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if err != nil {
return err
}
if len(entries) > 0 {
return nil
}
if err := os.Remove(sysPath); err != nil {
return err
}
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
var count uint64
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
shDB := b.getBlobovnicza(s)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
incompletedMoves, err := blz.ListMoveInfo(ctx)
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
return true, err
}
count++
}
return false, nil
})
}
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
) error {
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
target, err := targetDB.Open()
if err != nil {
return err
}
defer targetDB.Close()
existsInSource := true
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(move.Address)
gRes, err := source.Get(ctx, gPrm)
if err != nil {
if client.IsErrObjectNotFound(err) {
existsInSource = false
} else {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
}
if !existsInSource { // object was deleted by Rebuild, need to delete move info
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
existsInTarget, err := target.Exists(ctx, move.Address)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
if !existsInTarget {
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(move.Address)
putPrm.SetMarshaledObject(gRes.Object())
_, err = target.Put(ctx, putPrm)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
return err
}
}
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address))
return err
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(move.Address)
if _, err = source.Delete(ctx, deletePrm); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
return err
}
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
type moveIterator struct {
B *Blobovniczas
ID *ID
AllFull bool
Address oid.Address
ObjectData []byte
MetaStore common.MetaStorage
Source *blobovnicza.Blobovnicza
SourceSysPath string
}
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
}
return false, nil
}
if target == nil {
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
return false, nil
}
defer target.Close()
i.AllFull = false
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
Address: i.Address,
TargetStorageID: targetStorageID.Bytes(),
}); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Add(i.Address)
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(i.Address)
putPrm.SetMarshaledObject(i.ObjectData)
_, err = target.Blobovnicza().Put(ctx, putPrm)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
}
return true, nil
}
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address))
return true, nil
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(i.Address)
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Delete(i.Address)
i.ID = targetStorageID
return true, nil
}
type addressMap struct {
data map[oid.Address]struct{}
guard *sync.RWMutex
}
func newAddressMap() *addressMap {
return &addressMap{
data: make(map[oid.Address]struct{}),
guard: &sync.RWMutex{},
}
}
func (m *addressMap) Add(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
m.data[address] = struct{}{}
}
func (m *addressMap) Delete(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
delete(m.data, address)
}
func (m *addressMap) Contains(address oid.Address) bool {
m.guard.RLock()
defer m.guard.RUnlock()
_, contains := m.data[address]
return contains
}

View file

@ -0,0 +1,195 @@
package blobovniczatree
import (
"bytes"
"context"
"path/filepath"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestRebuildFailover(t *testing.T) {
t.Parallel()
t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved)
t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget)
t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource)
}
func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, false)
}
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
var dPrm common.DeletePrm
dPrm.Address = object.AddressOf(obj)
dPrm.StorageID = []byte("0/0/1")
_, err := b.Delete(context.Background(), dPrm)
require.ErrorIs(t, err, errObjectIsDeleteProtected)
metaStub := &storageIDUpdateStub{
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err := blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(object.AddressOf(obj))
_, err = blz.Get(context.Background(), gPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err = blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
gRes, err := blz.Get(context.Background(), gPrm)
require.NoError(t, err)
require.True(t, len(gRes.Object()) > 0)
if mustUpdateStorageID {
require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)]))
}
require.NoError(t, blz.Close())
}

View file

@ -0,0 +1,145 @@
package blobovniczatree
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestBlobovniczaTreeRebuild(t *testing.T) {
t.Parallel()
t.Run("width increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
})
t.Run("width reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
})
t.Run("depth increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
})
t.Run("depth reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
})
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(sourceWidth),
WithBlobovniczaShallowDepth(sourceDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
eg, egCtx := errgroup.WithContext(context.Background())
storageIDs := make(map[oid.Address][]byte)
storageIDsGuard := &sync.Mutex{}
for i := 0; i < 1000; i++ {
eg.Go(func() error {
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
if err != nil {
return err
}
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(egCtx, prm)
if err != nil {
return err
}
storageIDsGuard.Lock()
storageIDs[prm.Address] = res.StorageID
storageIDsGuard.Unlock()
return nil
})
}
require.NoError(t, eg.Wait())
require.NoError(t, b.Close())
b = NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(targetWidth),
WithBlobovniczaShallowDepth(targetDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.Equal(t, shouldMigrate, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
}
type storageIDUpdateStub struct {
guard *sync.Mutex
storageIDs map[oid.Address][]byte
updatedCount uint64
}
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
s.guard.Lock()
defer s.guard.Unlock()
s.storageIDs[addr] = storageID
s.updatedCount++
return nil
}
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}

View file

@ -0,0 +1,26 @@
package common
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type RebuildRes struct {
ObjectsMoved uint64
FilesRemoved uint64
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
}
type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}

View file

@ -30,4 +30,5 @@ type Storage interface {
Put(context.Context, PutPrm) (PutRes, error) Put(context.Context, PutPrm) (PutRes, error)
Delete(context.Context, DeletePrm) (DeleteRes, error) Delete(context.Context, DeletePrm) (DeleteRes, error)
Iterate(context.Context, IteratePrm) (IterateRes, error) Iterate(context.Context, IteratePrm) (IterateRes, error)
Rebuild(context.Context, RebuildPrm) (RebuildRes, error)
} }

View file

@ -574,3 +574,7 @@ func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
func (t *FSTree) SetParentID(parentID string) { func (t *FSTree) SetParentID(parentID string) {
t.metrics.SetParentID(parentID) t.metrics.SetParentID(parentID)
} }
func (t *FSTree) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -159,3 +159,7 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
} }
return common.IterateRes{}, nil return common.IterateRes{}, nil
} }
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -0,0 +1,45 @@
package blobstor
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved
if err != nil {
b.log.Error(logs.BlobstorRebuildFailedToRebuildStorages,
zap.String("failed_storage_path", storage.Storage.Path()),
zap.String("failed_storage_type", storage.Storage.Type()),
zap.Error(err))
rErr = err
break
}
}
b.log.Info(logs.BlobstorRebuildRebuildStoragesCompleted,
zap.Bool("success", rErr == nil),
zap.Uint64("total_files_removed", summary.FilesRemoved),
zap.Uint64("total_objects_moved", summary.ObjectsMoved))
return rErr
}

View file

@ -229,3 +229,7 @@ func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.
} }
func (s *TestStore) SetParentID(string) {} func (s *TestStore) SetParentID(string) {}
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -38,6 +38,7 @@ type StorageEngine struct {
err error err error
} }
evacuateLimiter *evacuationLimiter evacuateLimiter *evacuationLimiter
rebuildLimiter *rebuildLimiter
} }
type shardWrapper struct { type shardWrapper struct {
@ -215,13 +216,15 @@ type cfg struct {
shardPoolSize uint32 shardPoolSize uint32
lowMem bool lowMem bool
rebuildWorkersCount uint32
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
shardPoolSize: 20, rebuildWorkersCount: 100,
} }
} }
@ -240,6 +243,7 @@ func New(opts ...Option) *StorageEngine {
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest), setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{}, evacuateLimiter: &evacuationLimiter{},
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
} }
} }
@ -277,3 +281,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
c.lowMem = lowMemCons c.lowMem = lowMemCons
} }
} }
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
func WithRebuildWorkersCount(count uint32) Option {
return func(c *cfg) {
c.rebuildWorkersCount = count
}
}

View file

@ -0,0 +1,26 @@
package engine
import "context"
type rebuildLimiter struct {
semaphore chan struct{}
}
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}

View file

@ -118,6 +118,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
)...) )...)
if err := sh.UpdateID(ctx); err != nil { if err := sh.UpdateID(ctx); err != nil {

View file

@ -42,6 +42,18 @@ func (m *blobovniczaTreeMetrics) Close() {
m.m.CloseBlobobvnizcaTree(m.shardID, m.path) m.m.CloseBlobobvnizcaTree(m.shardID, m.path)
} }
func (m *blobovniczaTreeMetrics) SetRebuildStatus(status string) {
m.m.BlobovniczaTreeRebuildStatus(m.shardID, m.path, status)
}
func (m *blobovniczaTreeMetrics) SetRebuildPercent(value uint32) {
m.m.BlobovniczaTreeRebuildPercent(m.shardID, m.path, value)
}
func (m *blobovniczaTreeMetrics) ObjectMoved(d time.Duration) {
m.m.BlobovniczaTreeObjectMoved(m.shardID, m.path, d)
}
func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) { func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID}) m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
} }

View file

@ -162,6 +162,9 @@ func (s *Shard) Init(ctx context.Context) error {
s.gc.init(ctx) s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
return nil return nil
} }
@ -266,6 +269,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
// Close releases all Shard's components. // Close releases all Shard's components.
func (s *Shard) Close() error { func (s *Shard) Close() error {
if s.rb != nil {
s.rb.Stop(s.log)
}
components := []interface{ Close() error }{} components := []interface{ Close() error }{}
if s.pilorama != nil { if s.pilorama != nil {
@ -310,6 +316,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
unlock := s.lockExclusive() unlock := s.lockExclusive()
defer unlock() defer unlock()
s.rb.Stop(s.log)
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}()
ok, err := s.metaBase.Reload(c.metaOpts...) ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil { if err != nil {
if errors.Is(err, meta.ErrDegradedMode) { if errors.Is(err, meta.ErrDegradedMode) {

View file

@ -0,0 +1,13 @@
package shard
import "context"
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type noopRebuildLimiter struct{}
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}

View file

@ -0,0 +1,98 @@
package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
cancel: nil,
limiter: l,
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.start(ctx, bs, mb, log)
}
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
if r.cancel != nil {
r.stop(log)
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}()
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.stop(log)
}
func (r *rebuilder) stop(log *logger.Logger) {
if r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}

View file

@ -39,6 +39,8 @@ type Shard struct {
tsSource TombstoneSource tsSource TombstoneSource
rb *rebuilder
gcCancel atomic.Value gcCancel atomic.Value
setModeRequested atomic.Bool setModeRequested atomic.Bool
} }
@ -125,6 +127,8 @@ type cfg struct {
metricsWriter MetricsWriter metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error) reportErrorFunc func(selfID string, message string, err error)
rebuildLimiter RebuildWorkerLimiter
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -133,6 +137,7 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(), gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {}, reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
} }
} }
@ -370,6 +375,14 @@ func WithExpiredCollectorWorkerCount(count int) Option {
} }
} }
// WithRebuildWorkerLimiter return option to set concurrent
// workers count of storage rebuild operation.
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
return func(c *cfg) {
c.rebuildLimiter = l
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -23,16 +23,23 @@ type BlobobvnizcaMetrics interface {
IncOpenBlobovniczaCount(shardID, path string) IncOpenBlobovniczaCount(shardID, path string)
DecOpenBlobovniczaCount(shardID, path string) DecOpenBlobovniczaCount(shardID, path string)
BlobovniczaTreeRebuildStatus(shardID, path, status string)
BlobovniczaTreeRebuildPercent(shardID, path string, value uint32)
BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration)
} }
type blobovnicza struct { type blobovnicza struct {
treeMode *shardIDPathModeValue treeMode *shardIDPathModeValue
treeReqDuration *prometheus.HistogramVec treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec treeGet *prometheus.CounterVec
treeOpenSize *prometheus.GaugeVec treeOpenSize *prometheus.GaugeVec
treeOpenItems *prometheus.GaugeVec treeOpenItems *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec treeOpenCounter *prometheus.GaugeVec
treeObjectMoveDuration *prometheus.HistogramVec
treeRebuildStatus *shardIDPathModeValue
treeRebuildPercent *prometheus.GaugeVec
} }
func newBlobovnicza() *blobovnicza { func newBlobovnicza() *blobovnicza {
@ -75,6 +82,19 @@ func newBlobovnicza() *blobovnicza {
Name: "open_blobovnicza_count", Name: "open_blobovnicza_count",
Help: "Count of opened blobovniczas of Blobovnicza tree", Help: "Count of opened blobovniczas of Blobovnicza tree",
}, []string{shardIDLabel, pathLabel}), }, []string{shardIDLabel, pathLabel}),
treeObjectMoveDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "object_move_duration_seconds",
Help: "Accumulated Blobovnicza tree object move duration",
}, []string{shardIDLabel, pathLabel}),
treeRebuildStatus: newShardIDPathMode(blobovniczaTreeSubSystem, "rebuild_status", "Blobovnicza tree rebuild status"),
treeRebuildPercent: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "rebuild_complete_percent",
Help: "Percent of rebuild completeness",
}, []string{shardIDLabel, pathLabel}),
} }
} }
@ -96,6 +116,15 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
shardIDLabel: shardID, shardIDLabel: shardID,
pathLabel: path, pathLabel: path,
}) })
b.treeObjectMoveDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildPercent.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildStatus.SetMode(shardID, path, undefinedStatus)
} }
func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) { func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
@ -163,3 +192,21 @@ func (b *blobovnicza) SubOpenBlobovniczaItems(shardID, path string, items uint64
pathLabel: path, pathLabel: path,
}).Sub(float64(items)) }).Sub(float64(items))
} }
func (b *blobovnicza) BlobovniczaTreeRebuildStatus(shardID, path, status string) {
b.treeRebuildStatus.SetMode(shardID, path, status)
}
func (b *blobovnicza) BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration) {
b.treeObjectMoveDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Observe(d.Seconds())
}
func (b *blobovnicza) BlobovniczaTreeRebuildPercent(shardID, path string, value uint32) {
b.treeRebuildPercent.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Set(float64(value))
}

View file

@ -45,4 +45,5 @@ const (
failedToDeleteStatus = "failed_to_delete" failedToDeleteStatus = "failed_to_delete"
deletedStatus = "deleted" deletedStatus = "deleted"
undefinedStatus = "undefined"
) )