Blobovnicza tree rebuild #812

Merged
fyrchik merged 17 commits from dstepanov-yadro/frostfs-node:feat/shard_migrator_master into master 2024-09-04 19:51:04 +00:00
55 changed files with 2276 additions and 131 deletions

View file

@ -5,7 +5,7 @@ import (
"fmt"
common "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-lens/internal"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -40,7 +40,7 @@ func inspectFunc(cmd *cobra.Command, _ []string) {
common.ExitOnErr(cmd, common.Errf("could not check if the obj is small: %w", err))
if id := resStorageID.StorageID(); id != nil {
cmd.Printf("Object storageID: %s\n\n", blobovnicza.NewIDFromBytes(id).String())
cmd.Printf("Object storageID: %s\n\n", blobovniczatree.NewIDFromBytes(id).Path())
} else {
cmd.Printf("Object does not contain storageID\n\n")
}

View file

@ -103,6 +103,7 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []shardCfg
lowMem bool
rebuildWorkers uint32
}
}
@ -181,6 +182,8 @@ type subStorageCfg struct {
width uint64
leafWidth uint64
openedCacheSize int
initWorkerCount int
initInAdvance bool
}
// readConfig fills applicationConfiguration with raw configuration values
@ -212,6 +215,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -298,6 +302,8 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.initWorkerCount = sub.InitWorkerCount()
sCfg.initInAdvance = sub.InitInAdvance()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
@ -701,13 +707,14 @@ func initCfgObject(appCfg *config.Config) cfgObject {
}
func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4)
var opts []engine.Option
opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
)
if c.metricsCollector != nil {
@ -796,7 +803,10 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithInitInAdvance(sRead.initInAdvance),
blobovniczatree.WithLogger(c.log),
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
}
if c.metricsCollector != nil {

View file

@ -15,6 +15,9 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
// RebuildWorkersCountDefault is a default value of the workers count to
// process storage rebuild operations in a storage engine.
RebuildWorkersCountDefault = 100
)
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem")
}
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
func EngineRebuildWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
return v
}
return RebuildWorkersCountDefault
}

View file

@ -38,6 +38,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
})
const path = "../../../../config/example/node"
@ -47,6 +48,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
defer func() {
@ -97,6 +99,8 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, 10, blz.InitWorkerCount())
require.EqualValues(t, true, blz.InitInAdvance())
require.Equal(t, "tmp/0/blob", ss[1].Path())
require.EqualValues(t, 0o644, ss[1].Perm())
@ -146,6 +150,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount())
require.Equal(t, "tmp/1/blob", ss[1].Path())
require.EqualValues(t, 0o644, ss[1].Perm())

View file

@ -22,6 +22,9 @@ const (
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
OpenedCacheSizeDefault = 16
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
InitWorkerCountDefault = 5
)
// From wraps config section into Config.
@ -112,3 +115,29 @@ func (x *Config) LeafWidth() uint64 {
"leaf_width",
)
}
// InitWorkerCount returns the value of "init_worker_count" config parameter.
//
// Returns InitWorkerCountDefault if the value is not a positive number.
func (x *Config) InitWorkerCount() int {
d := config.IntSafe(
(*config.Config)(x),
"init_worker_count",
)
if d > 0 {
return int(d)
}
return InitWorkerCountDefault
}
// InitInAdvance returns the value of "init_in_advance" config parameter.
//
// Returns False if the value is not defined or invalid.
func (x *Config) InitInAdvance() bool {
return config.BoolSafe(
(*config.Config)(x),
"init_in_advance",
)
}

View file

@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
## 0 shard
### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
@ -125,6 +126,8 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_IN_ADVANCE=TRUE
### FSTree config
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob

View file

@ -137,6 +137,7 @@
"storage": {
"shard_pool_size": 15,
"shard_ro_error_threshold": 100,
"rebuild_workers_count": 1000,
"shard": {
"0": {
"mode": "read-only",
@ -172,7 +173,9 @@
"depth": 1,
"width": 4,
"opened_cache_capacity": 50,
"leaf_width": 10
"leaf_width": 10,
"init_worker_count": 10,
"init_in_advance": true
},
{
"type": "fstree",

View file

@ -116,6 +116,7 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
shard:
default: # section with the default shard parameters
@ -184,6 +185,8 @@ storage:
blobstor:
- type: blobovnicza
path: tmp/0/blob/blobovnicza
init_worker_count: 10 #count of workers to initialize blobovniczas
init_in_advance: true
- type: fstree
path: tmp/0/blob # blobstor path

View file

@ -519,4 +519,36 @@ const (
FailedToCountWritecacheItems = "failed to count writecache items"
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
FailedToGetContainerCounters = "failed to get container counters values"
FailedToRebuildBlobstore = "failed to rebuild blobstore"
BlobstoreRebuildStarted = "blobstore rebuild started"
BlobstoreRebuildCompletedSuccessfully = "blobstore rebuild completed successfully"
BlobstoreRebuildStopped = "blobstore rebuild stopped"
BlobovniczaTreeFixingFileExtensions = "fixing blobovnicza tree file extensions..."
BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully = "fixing blobovnicza tree file extensions completed successfully"
BlobovniczaTreeFixingFileExtensionsFailed = "failed to fix blobovnicza tree file extensions"
BlobovniczaTreeFixingFileExtensionForFile = "fixing blobovnicza file extension..."
BlobovniczaTreeFixingFileExtensionCompletedSuccessfully = "fixing blobovnicza file extension completed successfully"
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..."
BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed"
BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully"
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza = "could not put move info to source blobovnicza"
BlobovniczatreeCouldNotUpdateStorageID = "could not update storage ID"
BlobovniczatreeCouldNotDropMoveInfo = "could not drop move info from source blobovnicza"
BlobovniczatreeCouldNotDeleteFromSource = "could not delete object from source blobovnicza"
BlobovniczaTreeCompletingPreviousRebuild = "completing previous rebuild if failed..."
BlobovniczaTreeCompletedPreviousRebuildSuccess = "previous rebuild completed successfully"
BlobovniczaTreeCompletedPreviousRebuildFailed = "failed to complete previous rebuild"
BlobovniczatreeCouldNotCheckExistenceInSourceDB = "could not check object existence in source blobovnicza"
BlobovniczatreeCouldNotCheckExistenceInTargetDB = "could not check object existence in target blobovnicza"
BlobovniczatreeCouldNotGetObjectFromSourceDB = "could not get object from source blobovnicza"
BlobovniczatreeCouldNotPutObjectToTargetDB = "could not put object to target blobovnicza"
BlobovniczaSavingCountersToMeta = "saving counters to blobovnicza's meta..."
BlobovniczaSavingCountersToMetaSuccess = "saving counters to blobovnicza's meta completed successfully"
BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed"
)

View file

@ -104,17 +104,42 @@ func (b *Blobovnicza) Init() error {
func (b *Blobovnicza) initializeCounters() error {
var size uint64
var items uint64
var sizeExists bool
var itemsCountExists bool
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
keysN := uint64(b.Stats().KeyN)
size += keysN * upper
items += keysN
return false, nil
size, sizeExists = hasDataSize(tx)
items, itemsCountExists = hasItemsCount(tx)
if sizeExists && itemsCountExists {
return nil
}
return b.iterateAllDataBuckets(tx, func(lower, upper uint64, b *bbolt.Bucket) (bool, error) {
return false, b.ForEach(func(k, v []byte) error {
size += uint64(len(k) + len(v))
items++
return nil
})
})
})
if err != nil {
return fmt.Errorf("can't determine DB size: %w", err)
}
if (!sizeExists || !itemsCountExists) && !b.boltOptions.ReadOnly {
b.log.Debug(logs.BlobovniczaSavingCountersToMeta, zap.Uint64("size", size), zap.Uint64("items", items))
if err := b.boltDB.Update(func(tx *bbolt.Tx) error {
if err := saveDataSize(tx, size); err != nil {
return err
}
return saveItemsCount(tx, items)
}); err != nil {
b.log.Debug(logs.BlobovniczaSavingCountersToMetaFailed, zap.Uint64("size", size), zap.Uint64("items", items))
return fmt.Errorf("can't save blobovnicza's size and items count: %w", err)
}
b.log.Debug(logs.BlobovniczaSavingCountersToMetaSuccess, zap.Uint64("size", size), zap.Uint64("items", items))
}
b.dataSize.Store(size)
b.itemsCount.Store(items)
b.metrics.AddOpenBlobovniczaSize(size)

View file

@ -49,9 +49,10 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
var sizeUpperBound uint64
var sizeLowerBound uint64
var dataSize uint64
var recordSize uint64
err := b.boltDB.Update(func(tx *bbolt.Tx) error {
return b.iterateAllBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
err := b.iterateAllDataBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
objData := buck.Get(addrKey)
if objData == nil {
// object is not in bucket => continue iterating
@ -60,9 +61,27 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
dataSize = uint64(len(objData))
sizeLowerBound = lower
sizeUpperBound = upper
recordSize = dataSize + uint64(len(addrKey))
found = true
return true, buck.Delete(addrKey)
})
if err != nil {
return err
}
if found {
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
if count > 0 {
count--
}
if size >= recordSize {
size -= recordSize
} else {
size = 0
}
return count, size
})
}
return nil
})
if err == nil && !found {
@ -75,7 +94,7 @@ func (b *Blobovnicza) Delete(ctx context.Context, prm DeletePrm) (DeleteRes, err
zap.String("range", stringifyBounds(sizeLowerBound, sizeUpperBound)),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
b.itemDeleted(sizeUpperBound)
b.itemDeleted(recordSize)
}
return DeleteRes{}, err

View file

@ -24,7 +24,10 @@ func (b *Blobovnicza) Exists(ctx context.Context, addr oid.Address) (bool, error
addrKey := addressKey(addr)
err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
exists = buck.Get(addrKey) != nil
if exists {
return errInterruptForEach

View file

@ -57,7 +57,11 @@ func (b *Blobovnicza) Get(ctx context.Context, prm GetPrm) (GetRes, error) {
)
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
data = buck.Get(addrKey)
if data == nil {
return nil

View file

@ -12,11 +12,11 @@ import (
"go.opentelemetry.io/otel/trace"
)
// iterateAllBuckets iterates all buckets in db
// iterateAllDataBuckets iterates all buckets in db
//
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
// then there may be more buckets than the current limit of the object size.
func (b *Blobovnicza) iterateAllBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
buck := tx.Bucket(key)
if buck == nil {
@ -138,7 +138,10 @@ func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes,
var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
if isNonDataBucket(bucketName) {
return nil
}
return buck.ForEach(func(k, v []byte) error {
select {
case <-ctx.Done():

View file

@ -0,0 +1,103 @@
package blobovnicza
import (
"bytes"
"encoding/binary"
"go.etcd.io/bbolt"
)
const (
dataSizeAndItemsCountBufLength = 8
)
var (
metaBucketName = []byte("META")
dataSizeKey = []byte("data_size")
itemsCountKey = []byte("items_count")
)
func isNonDataBucket(bucketName []byte) bool {
return bytes.Equal(bucketName, incompletedMoveBucketName) || bytes.Equal(bucketName, metaBucketName)
}
func hasDataSize(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(dataSizeKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func hasItemsCount(tx *bbolt.Tx) (uint64, bool) {
b := tx.Bucket(metaBucketName)
if b == nil {
return 0, false
}
v := b.Get(itemsCountKey)
if v == nil {
return 0, false
}
if len(v) != dataSizeAndItemsCountBufLength {
return 0, false
}
return binary.LittleEndian.Uint64(v), true
}
func saveDataSize(tx *bbolt.Tx, size uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
return b.Put(dataSizeKey, buf)
}
func saveItemsCount(tx *bbolt.Tx, count uint64) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}
func updateMeta(tx *bbolt.Tx, updateValues func(count, size uint64) (uint64, uint64)) error {
b, err := tx.CreateBucketIfNotExists(metaBucketName)
if err != nil {
return err
}
var count uint64
var size uint64
v := b.Get(itemsCountKey)
if v != nil {
count = binary.LittleEndian.Uint64(v)
}
v = b.Get(dataSizeKey)
if v != nil {
size = binary.LittleEndian.Uint64(v)
}
count, size = updateValues(count, size)
buf := make([]byte, dataSizeAndItemsCountBufLength)
binary.LittleEndian.PutUint64(buf, size)
if err := b.Put(dataSizeKey, buf); err != nil {
return err
}
binary.LittleEndian.PutUint64(buf, count)
return b.Put(itemsCountKey, buf)
}

View file

@ -0,0 +1,108 @@
package blobovnicza
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var incompletedMoveBucketName = []byte("INCOMPLETED_MOVE")
type MoveInfo struct {
Address oid.Address
TargetStorageID []byte
}
func (b *Blobovnicza) PutMoveInfo(ctx context.Context, prm MoveInfo) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.PutMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", prm.Address.EncodeToString()),
attribute.String("target_storage_id", string(prm.TargetStorageID)),
))
defer span.End()
key := addressKey(prm.Address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(incompletedMoveBucketName)
if err != nil {
return err
}
if err := bucket.Put(key, prm.TargetStorageID); err != nil {
return fmt.Errorf("(%T) failed to save move info: %w", b, err)
}
return nil
})
}
func (b *Blobovnicza) DropMoveInfo(ctx context.Context, address oid.Address) error {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.DropMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
attribute.String("address", address.EncodeToString()),
))
defer span.End()
key := addressKey(address)
return b.boltDB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
if bucket == nil {
return nil
}
if err := bucket.Delete(key); err != nil {
return fmt.Errorf("(%T) failed to drop move info: %w", b, err)
}
c := bucket.Cursor()
k, v := c.First()
bucketEmpty := k == nil && v == nil
if bucketEmpty {
return tx.DeleteBucket(incompletedMoveBucketName)
}
return nil
})
}
func (b *Blobovnicza) ListMoveInfo(ctx context.Context) ([]MoveInfo, error) {
_, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.ListMoveInfo",
trace.WithAttributes(
attribute.String("path", b.path),
))
defer span.End()
var result []MoveInfo
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(incompletedMoveBucketName)
if bucket == nil {
return nil
}
return bucket.ForEach(func(k, v []byte) error {
var addr oid.Address
storageID := make([]byte, len(v))
if err := addressFromKey(&addr, k); err != nil {
return err
}
copy(storageID, v)
result = append(result, MoveInfo{
Address: addr,
TargetStorageID: storageID,
})
return nil
})
}); err != nil {
return nil, err
}
return result, nil
}

View file

@ -56,8 +56,9 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
defer span.End()
sz := uint64(len(prm.objData))
bucketName, upperBound := bucketForSize(sz)
bucketName := bucketForSize(sz)
key := addressKey(prm.addr)
recordSize := sz + uint64(len(key))
err := b.boltDB.Batch(func(tx *bbolt.Tx) error {
buck := tx.Bucket(bucketName)
@ -73,10 +74,12 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
return fmt.Errorf("(%T) could not save object in bucket: %w", b, err)
}
return nil
return updateMeta(tx, func(count, size uint64) (uint64, uint64) {
return count + 1, size + recordSize
})
})
if err == nil {
b.itemAdded(upperBound)
b.itemAdded(recordSize)
}
return PutRes{}, err

View file

@ -29,9 +29,8 @@ func bucketKeyFromBounds(upperBound uint64) []byte {
return buf[:ln]
}
func bucketForSize(sz uint64) ([]byte, uint64) {
upperBound := upperPowerOfTwo(sz)
return bucketKeyFromBounds(upperBound), upperBound
func bucketForSize(sz uint64) []byte {
return bucketKeyFromBounds(upperPowerOfTwo(sz))
}
func upperPowerOfTwo(v uint64) uint64 {

View file

@ -34,7 +34,7 @@ func TestSizes(t *testing.T) {
upperBound: 4 * firstBucketBound,
},
} {
key, _ := bucketForSize(item.sz)
key := bucketForSize(item.sz)
require.Equal(t, bucketKeyFromBounds(item.upperBound), key)
}
}

View file

@ -21,8 +21,8 @@ func (db *activeDB) Close() {
db.shDB.Close()
}
func (db *activeDB) Path() string {
return db.shDB.Path()
func (db *activeDB) SystemPath() string {
return db.shDB.SystemPath()
}
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
@ -154,7 +154,7 @@ func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
var next *sharedDB
for iterCount < m.leafWidth {
path := filepath.Join(lvlPath, u64ToHexString(idx))
path := filepath.Join(lvlPath, u64ToHexStringExt(idx))
shDB := m.dbManager.GetByPath(path)
db, err := shDB.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
if err != nil {
@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
if !ok {
return false, 0
}
return true, u64FromHexString(filepath.Base(db.Path()))
return true, u64FromHexString(filepath.Base(db.SystemPath()))
}
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {

View file

@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -54,15 +56,22 @@ import (
type Blobovniczas struct {
cfg
commondbManager *dbManager
activeDBManager *activeDBManager
dbCache *dbCache
commondbManager *dbManager
activeDBManager *activeDBManager
dbCache *dbCache
deleteProtectedObjects *addressMap
dbFilesGuard *sync.RWMutex
rebuildGuard *sync.RWMutex
}
var _ common.Storage = (*Blobovniczas)(nil)
var errPutFailed = errors.New("could not save the object in any blobovnicza")
const (
dbExtension = ".db"
)
// NewBlobovniczaTree returns new instance of blobovniczas tree.
func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz = new(Blobovniczas)
@ -76,9 +85,12 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz.blzLeafWidth = blz.blzShallowWidth
}
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
blz.deleteProtectedObjects = newAddressMap()
blz.dbFilesGuard = &sync.RWMutex{}
blz.rebuildGuard = &sync.RWMutex{}
return blz
}
@ -94,14 +106,16 @@ func addressHash(addr *oid.Address, path string) uint64 {
return hrw.StringHash(a + path)
}
// converts uint64 to hex string.
func u64ToHexString(ind uint64) string {
return strconv.FormatUint(ind, 16)
}
// converts uint64 hex string to uint64.
func u64ToHexStringExt(ind uint64) string {
return strconv.FormatUint(ind, 16) + dbExtension
}
func u64FromHexString(str string) uint64 {
v, err := strconv.ParseUint(str, 16, 64)
v, err := strconv.ParseUint(strings.TrimSuffix(str, dbExtension), 16, 64)
if err != nil {
panic(fmt.Sprintf("blobovnicza name is not an index %s", str))
}

View file

@ -15,8 +15,9 @@ import (
type dbCache struct {
cacheGuard *sync.RWMutex
cache simplelru.LRUCache[string, *sharedDB]
pathLock *utilSync.KeyLocker[string]
pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
closed bool
nonCached map[string]struct{}
dbManager *dbManager
}
@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache {
cache: cache,
dbManager: dbManager,
pathLock: utilSync.NewKeyLocker[string](),
nonCached: make(map[string]struct{}),
}
}
@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB {
return c.create(path)
}
func (c *dbCache) EvictAndMarkNonCached(path string) {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
c.cache.Remove(path)
c.nonCached[path] = struct{}{}
}
func (c *dbCache) RemoveFromNonCached(path string) {
c.pathLock.Lock(path)
defer c.pathLock.Unlock(path)
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
delete(c.nonCached, path)
}
func (c *dbCache) getExisted(path string) *sharedDB {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
c.cacheGuard.Lock()
defer c.cacheGuard.Unlock()
if !c.closed {
_, isNonCached := c.nonCached[path]
if !isNonCached && !c.closed {
c.cache.Add(path, db)
return true
}

View file

@ -2,15 +2,24 @@ package blobovniczatree
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
// Open opens blobovnicza tree.
func (b *Blobovniczas) Open(readOnly bool) error {
b.readOnly = readOnly
b.metrics.SetMode(readOnly)
b.metrics.SetRebuildStatus(rebuildStatusNotStarted)
b.openManagers()
return nil
}
@ -21,22 +30,84 @@ func (b *Blobovniczas) Open(readOnly bool) error {
func (b *Blobovniczas) Init() error {
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensions)
if err := b.addDBExtensionToDBs(b.rootPath, 0); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionsFailed, zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully)
if b.readOnly {
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
return nil
}
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return true, err
}
defer shBlz.Close()
return b.initializeDBs(context.TODO())
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
err := util.MkdirAllX(b.rootPath, b.perm)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(b.blzInitWorkerCount)
visited := make(map[string]struct{})
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
visited[p] = struct{}{}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
blz, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
moveInfo, err := blz.ListMoveInfo(egCtx)
if err != nil {
return err
}
for _, move := range moveInfo {
b.deleteProtectedObjects.Add(move.Address)
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
if b.createDBInAdvance {
err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found {
return false, nil
}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
}
return eg.Wait()
}
func (b *Blobovniczas) openManagers() {
@ -64,3 +135,37 @@ func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
return b.commondbManager.GetByPath(p)
}
func (b *Blobovniczas) addDBExtensionToDBs(path string, depth uint64) error {
entries, err := os.ReadDir(path)
if os.IsNotExist(err) && depth == 0 {
return nil
}
for _, entry := range entries {
if entry.IsDir() {
if err := b.addDBExtensionToDBs(filepath.Join(path, entry.Name()), depth+1); err != nil {
return err
}
continue
}
if strings.HasSuffix(entry.Name(), dbExtension) {
continue
}
if b.readOnly {
return errFailedToChangeExtensionReadOnly
}
sourcePath := filepath.Join(path, entry.Name())
targetPath := filepath.Join(path, entry.Name()+dbExtension)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionForFile, zap.String("source", sourcePath), zap.String("target", targetPath))
if err := os.Rename(sourcePath, targetPath); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionFailed, zap.String("source", sourcePath), zap.String("target", targetPath), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionCompletedSuccessfully, zap.String("source", sourcePath), zap.String("target", targetPath))
}
return nil
}

View file

@ -0,0 +1,189 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"github.com/stretchr/testify/require"
)
func TestDBExtensionFix(t *testing.T) {
root := t.TempDir()
createTestTree(t, 0, 2, 3, root)
t.Run("adds suffix if not exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
t.Run("not adds second suffix if exists", func(t *testing.T) {
openAndCloseTestTree(t, 2, 3, root)
validateTestTree(t, root)
})
}
func createTestTree(t *testing.T, currentDepth, depth, width uint64, path string) {
if currentDepth == depth {
var w uint64
for ; w < width; w++ {
dbPath := filepath.Join(path, u64ToHexString(w))
b := blobovnicza.New(blobovnicza.WithPath(dbPath))
require.NoError(t, b.Open())
require.NoError(t, b.Init())
require.NoError(t, b.Close())
}
return
}
var w uint64
for ; w < width; w++ {
createTestTree(t, currentDepth+1, depth, width, filepath.Join(path, u64ToHexString(w)))
}
}
func openAndCloseTestTree(t *testing.T, depth, width uint64, path string) {
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(depth),
WithBlobovniczaShallowWidth(width),
WithRootPath(path),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
require.NoError(t, blz.Close())
}
func validateTestTree(t *testing.T, path string) {
entries, err := os.ReadDir(path)
require.NoError(t, err)
for _, entry := range entries {
if entry.IsDir() {
validateTestTree(t, filepath.Join(path, entry.Name()))
} else {
require.True(t, strings.HasSuffix(entry.Name(), dbExtension))
require.False(t, strings.HasSuffix(strings.TrimSuffix(entry.Name(), dbExtension), dbExtension))
}
}
}
func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
t.Parallel()
rootDir := t.TempDir()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
obj35 := blobstortest.NewObject(10 * 1024)
addr35 := objectCore.AddressOf(obj35)
raw, err := obj35.Marshal()
require.NoError(t, err)
pRes35, err := blz.Put(context.Background(), common.PutPrm{
Address: addr35,
Object: obj35,
RawData: raw,
})
require.NoError(t, err)
gRes, err := blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
require.NoError(t, blz.Close())
// change depth and width
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(5),
WithBlobovniczaShallowWidth(2),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
obj52 := blobstortest.NewObject(10 * 1024)
addr52 := objectCore.AddressOf(obj52)
raw, err = obj52.Marshal()
require.NoError(t, err)
pRes52, err := blz.Put(context.Background(), common.PutPrm{
Address: addr52,
Object: obj52,
RawData: raw,
})
require.NoError(t, err)
require.NoError(t, blz.Close())
// change depth and width back
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
StorageID: pRes52.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
require.NoError(t, blz.Close())
}

View file

@ -3,6 +3,7 @@ package blobovniczatree
import (
"context"
"encoding/hex"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -18,6 +19,8 @@ import (
"go.uber.org/zap"
)
var errObjectIsDeleteProtected = errors.New("object is delete protected")
// Delete deletes object from blobovnicza tree.
//
// If blobocvnicza ID is specified, only this blobovnicza is processed.
@ -43,12 +46,22 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
return common.DeleteRes{}, common.ErrReadOnly
}
if b.rebuildGuard.TryRLock() {
defer b.rebuildGuard.RUnlock()
} else {
return common.DeleteRes{}, errRebuildInProgress
}
if b.deleteProtectedObjects.Contains(prm.Address) {
return common.DeleteRes{}, errObjectIsDeleteProtected
}
var bPrm blobovnicza.DeletePrm
bPrm.SetAddress(prm.Address)
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return res, err
@ -63,7 +76,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -36,8 +36,8 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return common.ExistsRes{}, err
@ -51,7 +51,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address)
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err := b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
_, err := b.getObjectFromLevel(ctx, gPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -55,7 +55,7 @@ func TestExistsInvalidStorageID(t *testing.T) {
// An invalid boltdb file is created so that it returns an error when opened
require.NoError(t, os.MkdirAll(filepath.Join(dir, relBadFileDir), os.ModePerm))
require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName), []byte("not a boltdb file content"), 0o777))
require.NoError(t, os.WriteFile(filepath.Join(dir, relBadFileDir, badFileName+".db"), []byte("not a boltdb file content"), 0o777))
res, err := b.Exists(context.Background(), common.ExistsPrm{Address: addr, StorageID: []byte(filepath.Join(relBadFileDir, badFileName))})
require.Error(t, err)

View file

@ -47,8 +47,8 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
bPrm.SetAddress(prm.Address)
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return res, err
@ -63,7 +63,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
return res, err
}
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -46,8 +46,8 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
defer span.End()
if prm.StorageID != nil {
id := blobovnicza.NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.String())
id := NewIDFromBytes(prm.StorageID)
shBlz := b.getBlobovnicza(id.Path())
blz, err := shBlz.Open()
if err != nil {
return common.GetRangeRes{}, err
@ -64,7 +64,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getRangeFromLevel(ctx, prm, p)
if err != nil {
outOfBounds := isErrOutOfRange(err)

View file

@ -1,4 +1,4 @@
package blobovnicza
package blobovniczatree
// ID represents Blobovnicza identifier.
type ID []byte
@ -8,8 +8,8 @@ func NewIDFromBytes(v []byte) *ID {
return (*ID)(&v)
}
func (id ID) String() string {
return string(id)
func (id ID) Path() string {
return string(id) + dbExtension
}
func (id ID) Bytes() []byte {

View file

@ -3,7 +3,9 @@ package blobovniczatree
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -54,7 +56,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
return prm.Handler(common.IterationElement{
Address: elem.Address(),
ObjectData: data,
StorageID: []byte(p),
StorageID: []byte(strings.TrimSuffix(p, dbExtension)),
})
}
return nil
@ -69,7 +71,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(ctx, func(p string) (bool, error) {
return b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shBlz := b.getBlobovnicza(p)
blz, err := shBlz.Open()
if err != nil {
@ -90,7 +92,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
})
}
// iterator over the paths of Blobovniczas sorted by weight.
// iterateSortedLeaves iterates over the paths of Blobovniczas sorted by weight.
//
// Uses depth, width and leaf width for iteration.
func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSorted(
ctx,
@ -130,7 +134,9 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
}
indices := indexSlice(levelWidth)
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
if !isLeafLevel {
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
}
exec := uint64(len(curPath)) == execDepth
@ -140,10 +146,16 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, ctx.Err()
default:
}
lastPart := u64ToHexString(indices[i])
if isLeafLevel {
lastPart = u64ToHexStringExt(indices[i])
}
if i == 0 {
curPath = append(curPath, u64ToHexString(indices[i]))
curPath = append(curPath, lastPart)
} else {
curPath[len(curPath)-1] = u64ToHexString(indices[i])
curPath[len(curPath)-1] = lastPart
}
if exec {
@ -162,9 +174,110 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, nil
}
// iterator over the paths of Blobovniczas in random order.
func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error {
return b.iterateSortedLeaves(ctx, nil, f)
// iterateExistingDBPaths iterates over the paths of Blobovniczas without any order.
//
// Uses existed blobovnicza files for iteration.
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
return err
}
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
for _, entry := range entries {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if entry.IsDir() {
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
} else {
stop, err := f(filepath.Join(path, entry.Name()))
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
b.dbFilesGuard.RLock()
defer b.dbFilesGuard.RUnlock()
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
return err
}
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
var dbIdxs []uint64
var dirIdxs []uint64
for _, entry := range entries {
idx := u64FromHexString(entry.Name())
if entry.IsDir() {
dirIdxs = append(dirIdxs, idx)
} else {
dbIdxs = append(dbIdxs, idx)
}
}
if len(dbIdxs) > 0 {
for _, dbIdx := range dbIdxs {
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs {
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
// makes slice of uint64 values from 0 to number-1.

View file

@ -0,0 +1,42 @@
package blobovniczatree
import (
"context"
"testing"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestIterateSortedLeavesAndDBPathsAreSame(t *testing.T) {
t.Parallel()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(t.TempDir()),
)
blz.createDBInAdvance = true
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
defer func() {
require.NoError(t, blz.Close())
}()
addr := oidtest.Address()
var leaves []string
var dbPaths []string
blz.iterateSortedLeaves(context.Background(), &addr, func(s string) (bool, error) {
leaves = append(leaves, s)
return false, nil
})
blz.iterateSortedDBPaths(context.Background(), addr, func(s string) (bool, error) {
dbPaths = append(dbPaths, s)
return false, nil
})
require.Equal(t, leaves, dbPaths)
}

View file

@ -1,7 +1,9 @@
package blobovniczatree
import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
@ -12,9 +14,11 @@ import (
"go.uber.org/zap"
)
var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
// sharedDB is responsible for opening and closing a file of single blobovnicza.
type sharedDB struct {
guard *sync.RWMutex
cond *sync.Cond
blcza *blobovnicza.Blobovnicza
refCount uint32
@ -31,8 +35,9 @@ func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *sharedDB {
return &sharedDB{
guard: &sync.RWMutex{},
cond: &sync.Cond{
L: &sync.RWMutex{},
},
options: options,
path: path,
readOnly: readOnly,
@ -48,8 +53,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
return nil, errClosed
}
b.guard.Lock()
defer b.guard.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount > 0 {
b.refCount++
@ -77,11 +82,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
}
func (b *sharedDB) Close() {
b.guard.Lock()
defer b.guard.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
if b.refCount == 0 {
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
b.cond.Broadcast()
return
}
@ -99,33 +105,109 @@ func (b *sharedDB) Close() {
}
b.refCount--
if b.refCount == 1 {
b.cond.Broadcast()
}
}
func (b *sharedDB) Path() string {
func (b *sharedDB) CloseAndRemoveFile() error {
b.cond.L.Lock()
if b.refCount > 1 {
b.cond.Wait()
}
defer b.cond.L.Unlock()
if b.refCount == 0 {
return errClosingClosedBlobovnicza
}
if err := b.blcza.Close(); err != nil {
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
zap.String("id", b.path),
zap.String("error", err.Error()),
)
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
}
b.refCount = 0
b.blcza = nil
b.openDBCounter.Dec()
return os.Remove(b.path)
}
func (b *sharedDB) SystemPath() string {
return b.path
}
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
databases []*sharedDB
dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
openDBCounter *openDBCounter
closedFlag *atomic.Bool
log *logger.Logger
}
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger,
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *levelDbManager {
result := &levelDbManager{
databases: make([]*sharedDB, width),
}
for idx := uint64(0); idx < width; idx++ {
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexString(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
databases: make(map[uint64]*sharedDB),
dbMtx: &sync.RWMutex{},
options: options,
path: filepath.Join(rootPath, lvlPath),
readOnly: readOnly,
metrics: metrics,
openDBCounter: openDBCounter,
closedFlag: closedFlag,
log: log,
}
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
}
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx]
}
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
db := m.databases[idx]
if db != nil {
return db
}
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
m.databases[idx] = db
return db
}
func (m *levelDbManager) hasAnyDB() bool {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return len(m.databases) > 0
}
// dbManager manages the opening and closing of blobovnicza instances.
//
// The blobovnicza opens at the first request, closes after the last request.
@ -135,21 +217,19 @@ type dbManager struct {
closedFlag *atomic.Bool
dbCounter *openDBCounter
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
leafWidth uint64
log *logger.Logger
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{
rootPath: rootPath,
options: options,
readOnly: readOnly,
metrics: metrics,
leafWidth: leafWidth,
levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
@ -165,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB {
return levelManager.GetByIndex(curIndex)
}
func (m *dbManager) CleanResources(path string) {
lvlPath := filepath.Dir(path)
m.levelToManagerGuard.Lock()
defer m.levelToManagerGuard.Unlock()
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
delete(m.levelToManager, lvlPath)
}
}
func (m *dbManager) Open() {
m.closedFlag.Store(false)
}
@ -197,7 +288,7 @@ func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
return result
}
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result
return result
}

View file

@ -6,6 +6,13 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
)
const (
rebuildStatusNotStarted = "not_started"
rebuildStatusRunning = "running"
rebuildStatusCompleted = "completed"
rebuildStatusFailed = "failed"
)
type Metrics interface {
Blobovnicza() blobovnicza.Metrics
@ -14,6 +21,10 @@ type Metrics interface {
SetMode(readOnly bool)
Close()
SetRebuildStatus(status string)
ObjectMoved(d time.Duration)
SetRebuildPercent(value uint32)
Delete(d time.Duration, success, withStorageID bool)
Exists(d time.Duration, success, withStorageID bool)
GetRange(d time.Duration, size int, success, withStorageID bool)
@ -27,6 +38,9 @@ type noopMetrics struct{}
func (m *noopMetrics) SetParentID(string) {}
func (m *noopMetrics) SetMode(bool) {}
func (m *noopMetrics) Close() {}
func (m *noopMetrics) SetRebuildStatus(string) {}
func (m *noopMetrics) SetRebuildPercent(uint32) {}
func (m *noopMetrics) ObjectMoved(time.Duration) {}
func (m *noopMetrics) Delete(time.Duration, bool, bool) {}
func (m *noopMetrics) Exists(time.Duration, bool, bool) {}
func (m *noopMetrics) GetRange(time.Duration, int, bool, bool) {}

View file

@ -2,6 +2,7 @@ package blobovniczatree
import (
"io/fs"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
@ -10,39 +11,48 @@ import (
)
type cfg struct {
log *logger.Logger
perm fs.FileMode
readOnly bool
rootPath string
openedCacheSize int
blzShallowDepth uint64
blzShallowWidth uint64
blzLeafWidth uint64
compression *compression.Config
blzOpts []blobovnicza.Option
// reportError is the function called when encountering disk errors.
reportError func(string, error)
metrics Metrics
log *logger.Logger
perm fs.FileMode
readOnly bool
rootPath string
openedCacheSize int
blzShallowDepth uint64
blzShallowWidth uint64
blzLeafWidth uint64
compression *compression.Config
blzOpts []blobovnicza.Option
reportError func(string, error) // reportError is the function called when encountering disk errors.
metrics Metrics
waitBeforeDropDB time.Duration
blzInitWorkerCount int
blzMoveBatchSize int
createDBInAdvance bool
}
type Option func(*cfg)
const (
defaultPerm = 0o700
defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16
defaultPerm = 0o700
defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5
defaulBlzMoveBatchSize = 10000
)
func initConfig(c *cfg) {
*c = cfg{
log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {},
metrics: &noopMetrics{},
log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {},
metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount,
blzMoveBatchSize: defaulBlzMoveBatchSize,
}
}
@ -106,3 +116,34 @@ func WithMetrics(m Metrics) Option {
c.metrics = m
}
}
func WithWaitBeforeDropDB(t time.Duration) Option {
return func(c *cfg) {
c.waitBeforeDropDB = t
}
}
func WithMoveBatchSize(v int) Option {
return func(c *cfg) {
c.blzMoveBatchSize = v
}
}
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
//
// Negative or zero value means no limit.
func WithInitWorkerCount(v int) Option {
if v <= 0 {
v = -1
}
return func(c *cfg) {
c.blzInitWorkerCount = v
}
}
// WithInitInAdvance returns an option to create blobovnicza tree DB's in advance.
func WithInitInAdvance(v bool) Option {
return func(c *cfg) {
c.createDBInAdvance = v
}
}

View file

@ -70,7 +70,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
type putIterator struct {
B *Blobovniczas
ID *blobovnicza.ID
ID *ID
AllFull bool
PutPrm blobovnicza.PutPrm
}
@ -104,7 +104,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
zap.String("path", active.Path()),
zap.String("path", active.SystemPath()),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
@ -112,8 +112,8 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
return false, nil
}
idx := u64FromHexString(filepath.Base(active.Path()))
i.ID = blobovnicza.NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
idx := u64FromHexString(filepath.Base(active.SystemPath()))
i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
return true, nil
}

View file

@ -0,0 +1,478 @@
package blobovniczatree
import (
"bytes"
"context"
"errors"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly {
return common.RebuildRes{}, common.ErrReadOnly
}
b.metrics.SetRebuildStatus(rebuildStatusRunning)
b.metrics.SetRebuildPercent(0)
success := true
defer func() {
if success {
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
} else {
b.metrics.SetRebuildStatus(rebuildStatusFailed)
}
}()
b.rebuildGuard.Lock()
defer b.rebuildGuard.Unlock()
var res common.RebuildRes
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
if err != nil {
success = false
}
return res, err
}
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
res.FilesRemoved++
completedDBCount++
b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs)))
}
b.metrics.SetRebuildPercent(100)
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
dbsToMigrate := make(map[string]struct{})
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
dbsToMigrate[s] = struct{}{}
return false, nil
}); err != nil {
return nil, err
}
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
delete(dbsToMigrate, s)
return false, nil
}); err != nil {
return nil, err
}
result := make([]string, 0, len(dbsToMigrate))
for db := range dbsToMigrate {
result = append(result, db)
}
return result, nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return 0, err
}
shDBClosed := false
defer func() {
if shDBClosed {
return
}
shDB.Close()
}()
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
shDBClosed, err = b.dropDB(ctx, path, shDB)
return migratedObjects, err
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm
prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if len(batch) == b.blzMoveBatchSize {
return errBatchFull
}
return nil
})
for {
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
addr := addr
data := data
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
}
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
addr oid.Address, data []byte, metaStore common.MetaStorage,
) error {
startedAt := time.Now()
defer func() {
b.metrics.ObjectMoved(time.Since(startedAt))
}()
it := &moveIterator{
B: b,
ID: nil,
AllFull: true,
Address: addr,
ObjectData: data,
MetaStore: metaStore,
Source: source,
SourceSysPath: sourcePath,
}
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
return err
} else if it.ID == nil {
if it.AllFull {
return common.ErrNoSpace
}
return errPutFailed
}
return nil
}
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
}
b.dbCache.EvictAndMarkNonCached(path)
defer b.dbCache.RemoveFromNonCached(path)
b.dbFilesGuard.Lock()
defer b.dbFilesGuard.Unlock()
if err := shDb.CloseAndRemoveFile(); err != nil {
return false, err
}
b.commondbManager.CleanResources(path)
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
return true, err
}
return true, nil
}
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
if path == "." {
return nil
}
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if err != nil {
return err
}
if len(entries) > 0 {
return nil
}
if err := os.Remove(sysPath); err != nil {
return err
}
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
var count uint64
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
shDB := b.getBlobovnicza(s)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
incompletedMoves, err := blz.ListMoveInfo(ctx)
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
return true, err
}
count++
}
return false, nil
})
}
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
) error {
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
target, err := targetDB.Open()
if err != nil {
return err
}
defer targetDB.Close()
existsInSource := true
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(move.Address)
gRes, err := source.Get(ctx, gPrm)
if err != nil {
if client.IsErrObjectNotFound(err) {
existsInSource = false
} else {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
}
if !existsInSource { // object was deleted by Rebuild, need to delete move info
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
existsInTarget, err := target.Exists(ctx, move.Address)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
if !existsInTarget {
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(move.Address)
putPrm.SetMarshaledObject(gRes.Object())
_, err = target.Put(ctx, putPrm)
if err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
return err
}
}
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address))
return err
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(move.Address)
if _, err = source.Delete(ctx, deletePrm); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
return err
}
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
type moveIterator struct {
B *Blobovniczas
ID *ID
AllFull bool
Address oid.Address
ObjectData []byte
MetaStore common.MetaStorage
Source *blobovnicza.Blobovnicza
SourceSysPath string
}
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
}
return false, nil
}
if target == nil {
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
return false, nil
}
defer target.Close()
i.AllFull = false
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
Address: i.Address,
TargetStorageID: targetStorageID.Bytes(),
}); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Add(i.Address)
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(i.Address)
putPrm.SetMarshaledObject(i.ObjectData)
_, err = target.Blobovnicza().Put(ctx, putPrm)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
}
return true, nil
}
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address))
return true, nil
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(i.Address)
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
} else {
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Delete(i.Address)
i.ID = targetStorageID
return true, nil
}
type addressMap struct {
data map[oid.Address]struct{}
guard *sync.RWMutex
}
func newAddressMap() *addressMap {
return &addressMap{
data: make(map[oid.Address]struct{}),
guard: &sync.RWMutex{},
}
}
func (m *addressMap) Add(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
m.data[address] = struct{}{}
}
func (m *addressMap) Delete(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
delete(m.data, address)
}
func (m *addressMap) Contains(address oid.Address) bool {
m.guard.RLock()
defer m.guard.RUnlock()
_, contains := m.data[address]
return contains
}

View file

@ -0,0 +1,195 @@
package blobovniczatree
import (
"bytes"
"context"
"path/filepath"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestRebuildFailover(t *testing.T) {
t.Parallel()
t.Run("only move info saved", testRebuildFailoverOnlyMoveInfoSaved)
t.Run("object saved to target", testRebuildFailoverObjectSavedToTarget)
t.Run("object deleted from source", testRebuildFailoverObjectDeletedFromSource)
}
func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, true)
}
func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
t.Parallel()
dir := t.TempDir()
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
require.NoError(t, err)
require.NoError(t, blz.PutMoveInfo(context.Background(), blobovnicza.MoveInfo{
Address: object.AddressOf(obj),
TargetStorageID: []byte("0/0/0"),
}))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
var pPrm blobovnicza.PutPrm
pPrm.SetAddress(object.AddressOf(obj))
pPrm.SetMarshaledObject(data)
_, err = blz.Put(context.Background(), pPrm)
require.NoError(t, err)
require.NoError(t, blz.Close())
testRebuildFailoverValidate(t, dir, obj, false)
}
func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object, mustUpdateStorageID bool) {
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
var dPrm common.DeletePrm
dPrm.Address = object.AddressOf(obj)
dPrm.StorageID = []byte("0/0/1")
_, err := b.Delete(context.Background(), dPrm)
require.ErrorIs(t, err, errObjectIsDeleteProtected)
metaStub := &storageIDUpdateStub{
storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)
require.Equal(t, uint64(0), rRes.FilesRemoved)
require.NoError(t, b.Close())
blz := blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "1.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err := blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(object.AddressOf(obj))
_, err = blz.Get(context.Background(), gPrm)
require.True(t, client.IsErrObjectNotFound(err))
require.NoError(t, blz.Close())
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
require.NoError(t, blz.Open())
require.NoError(t, blz.Init())
moveInfo, err = blz.ListMoveInfo(context.Background())
require.NoError(t, err)
require.Equal(t, 0, len(moveInfo))
gRes, err := blz.Get(context.Background(), gPrm)
require.NoError(t, err)
require.True(t, len(gRes.Object()) > 0)
if mustUpdateStorageID {
require.True(t, bytes.Equal([]byte("0/0/0"), metaStub.storageIDs[object.AddressOf(obj)]))
}
require.NoError(t, blz.Close())
}

View file

@ -0,0 +1,145 @@
package blobovniczatree
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestBlobovniczaTreeRebuild(t *testing.T) {
t.Parallel()
t.Run("width increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
})
t.Run("width reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
})
t.Run("depth increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
})
t.Run("depth reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
})
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(sourceWidth),
WithBlobovniczaShallowDepth(sourceDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
eg, egCtx := errgroup.WithContext(context.Background())
storageIDs := make(map[oid.Address][]byte)
storageIDsGuard := &sync.Mutex{}
for i := 0; i < 1000; i++ {
eg.Go(func() error {
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
if err != nil {
return err
}
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(egCtx, prm)
if err != nil {
return err
}
storageIDsGuard.Lock()
storageIDs[prm.Address] = res.StorageID
storageIDsGuard.Unlock()
return nil
})
}
require.NoError(t, eg.Wait())
require.NoError(t, b.Close())
b = NewBlobovniczaTree(
WithLogger(test.NewLogger(t, true)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(targetWidth),
WithBlobovniczaShallowDepth(targetDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.Equal(t, shouldMigrate, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
}
type storageIDUpdateStub struct {
guard *sync.Mutex
storageIDs map[oid.Address][]byte
updatedCount uint64
}
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
s.guard.Lock()
defer s.guard.Unlock()
s.storageIDs[addr] = storageID
s.updatedCount++
return nil
}
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}

View file

@ -0,0 +1,26 @@
package common
import (
"context"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type RebuildRes struct {
ObjectsMoved uint64
FilesRemoved uint64
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
}
type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}

View file

@ -30,4 +30,5 @@ type Storage interface {
Put(context.Context, PutPrm) (PutRes, error)
Delete(context.Context, DeletePrm) (DeleteRes, error)
Iterate(context.Context, IteratePrm) (IterateRes, error)
Rebuild(context.Context, RebuildPrm) (RebuildRes, error)
}

View file

@ -574,3 +574,7 @@ func (t *FSTree) SetReportErrorFunc(_ func(string, error)) {
func (t *FSTree) SetParentID(parentID string) {
t.metrics.SetParentID(parentID)
}
func (t *FSTree) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -159,3 +159,7 @@ func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common
}
return common.IterateRes{}, nil
}
func (s *memstoreImpl) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -0,0 +1,45 @@
package blobstor
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved
if err != nil {
b.log.Error(logs.BlobstorRebuildFailedToRebuildStorages,
zap.String("failed_storage_path", storage.Storage.Path()),
zap.String("failed_storage_type", storage.Storage.Type()),
zap.Error(err))
rErr = err
break
}
}
b.log.Info(logs.BlobstorRebuildRebuildStoragesCompleted,
zap.Bool("success", rErr == nil),
zap.Uint64("total_files_removed", summary.FilesRemoved),
zap.Uint64("total_objects_moved", summary.ObjectsMoved))
return rErr
}

View file

@ -229,3 +229,7 @@ func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.
}
func (s *TestStore) SetParentID(string) {}
func (s *TestStore) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
return common.RebuildRes{}, nil
}

View file

@ -38,6 +38,7 @@ type StorageEngine struct {
err error
}
evacuateLimiter *evacuationLimiter
rebuildLimiter *rebuildLimiter
}
type shardWrapper struct {
@ -215,13 +216,15 @@ type cfg struct {
shardPoolSize uint32
lowMem bool
rebuildWorkersCount uint32
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
rebuildWorkersCount: 100,
}
}
@ -240,6 +243,7 @@ func New(opts ...Option) *StorageEngine {
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
}
}
@ -277,3 +281,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
c.lowMem = lowMemCons
}
}
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
func WithRebuildWorkersCount(count uint32) Option {
return func(c *cfg) {
c.rebuildWorkersCount = count
}
}

View file

@ -0,0 +1,26 @@
package engine
import "context"
type rebuildLimiter struct {
semaphore chan struct{}
}
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}

View file

@ -118,6 +118,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
)...)
if err := sh.UpdateID(ctx); err != nil {

View file

@ -42,6 +42,18 @@ func (m *blobovniczaTreeMetrics) Close() {
m.m.CloseBlobobvnizcaTree(m.shardID, m.path)
}
func (m *blobovniczaTreeMetrics) SetRebuildStatus(status string) {
m.m.BlobovniczaTreeRebuildStatus(m.shardID, m.path, status)
}
func (m *blobovniczaTreeMetrics) SetRebuildPercent(value uint32) {
m.m.BlobovniczaTreeRebuildPercent(m.shardID, m.path, value)
}
func (m *blobovniczaTreeMetrics) ObjectMoved(d time.Duration) {
m.m.BlobovniczaTreeObjectMoved(m.shardID, m.path, d)
}
func (m *blobovniczaTreeMetrics) Delete(d time.Duration, success, withStorageID bool) {
m.m.BlobobvnizcaTreeMethodDuration(m.shardID, m.path, "Delete", d, success, metrics_impl.NullBool{Valid: true, Bool: withStorageID})
}

View file

@ -162,6 +162,9 @@ func (s *Shard) Init(ctx context.Context) error {
s.gc.init(ctx)
s.rb = newRebuilder(s.rebuildLimiter)
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
return nil
}
@ -266,6 +269,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
// Close releases all Shard's components.
func (s *Shard) Close() error {
if s.rb != nil {
s.rb.Stop(s.log)
}
components := []interface{ Close() error }{}
if s.pilorama != nil {
@ -310,6 +316,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
unlock := s.lockExclusive()
defer unlock()
s.rb.Stop(s.log)
defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}()
ok, err := s.metaBase.Reload(c.metaOpts...)
if err != nil {
if errors.Is(err, meta.ErrDegradedMode) {

View file

@ -0,0 +1,13 @@
package shard
import "context"
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type noopRebuildLimiter struct{}
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}

View file

@ -0,0 +1,98 @@
package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
cancel: nil,
limiter: l,
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.start(ctx, bs, mb, log)
}
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
if r.cancel != nil {
r.stop(log)
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}()
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.stop(log)
}
func (r *rebuilder) stop(log *logger.Logger) {
if r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}

View file

@ -39,6 +39,8 @@ type Shard struct {
tsSource TombstoneSource
rb *rebuilder
gcCancel atomic.Value
setModeRequested atomic.Bool
}
@ -125,6 +127,8 @@ type cfg struct {
metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error)
rebuildLimiter RebuildWorkerLimiter
}
func defaultCfg() *cfg {
@ -133,6 +137,7 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
}
}
@ -370,6 +375,14 @@ func WithExpiredCollectorWorkerCount(count int) Option {
}
}
// WithRebuildWorkerLimiter return option to set concurrent
// workers count of storage rebuild operation.
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
return func(c *cfg) {
c.rebuildLimiter = l
}
}
func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()

View file

@ -23,16 +23,23 @@ type BlobobvnizcaMetrics interface {
IncOpenBlobovniczaCount(shardID, path string)
DecOpenBlobovniczaCount(shardID, path string)
BlobovniczaTreeRebuildStatus(shardID, path, status string)
BlobovniczaTreeRebuildPercent(shardID, path string, value uint32)
BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration)
}
type blobovnicza struct {
treeMode *shardIDPathModeValue
treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec
treeOpenSize *prometheus.GaugeVec
treeOpenItems *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec
treeMode *shardIDPathModeValue
treeReqDuration *prometheus.HistogramVec
treePut *prometheus.CounterVec
treeGet *prometheus.CounterVec
treeOpenSize *prometheus.GaugeVec
treeOpenItems *prometheus.GaugeVec
treeOpenCounter *prometheus.GaugeVec
treeObjectMoveDuration *prometheus.HistogramVec
treeRebuildStatus *shardIDPathModeValue
treeRebuildPercent *prometheus.GaugeVec
}
func newBlobovnicza() *blobovnicza {
@ -75,6 +82,19 @@ func newBlobovnicza() *blobovnicza {
Name: "open_blobovnicza_count",
Help: "Count of opened blobovniczas of Blobovnicza tree",
}, []string{shardIDLabel, pathLabel}),
treeObjectMoveDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "object_move_duration_seconds",
Help: "Accumulated Blobovnicza tree object move duration",
}, []string{shardIDLabel, pathLabel}),
treeRebuildStatus: newShardIDPathMode(blobovniczaTreeSubSystem, "rebuild_status", "Blobovnicza tree rebuild status"),
treeRebuildPercent: metrics.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: blobovniczaTreeSubSystem,
Name: "rebuild_complete_percent",
Help: "Percent of rebuild completeness",
}, []string{shardIDLabel, pathLabel}),
}
}
@ -96,6 +116,15 @@ func (b *blobovnicza) CloseBlobobvnizcaTree(shardID, path string) {
shardIDLabel: shardID,
pathLabel: path,
})
b.treeObjectMoveDuration.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildPercent.DeletePartialMatch(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
})
b.treeRebuildStatus.SetMode(shardID, path, undefinedStatus)
}
func (b *blobovnicza) BlobobvnizcaTreeMethodDuration(shardID, path string, method string, d time.Duration, success bool, withStorageID NullBool) {
@ -163,3 +192,21 @@ func (b *blobovnicza) SubOpenBlobovniczaItems(shardID, path string, items uint64
pathLabel: path,
}).Sub(float64(items))
}
func (b *blobovnicza) BlobovniczaTreeRebuildStatus(shardID, path, status string) {
b.treeRebuildStatus.SetMode(shardID, path, status)
}
func (b *blobovnicza) BlobovniczaTreeObjectMoved(shardID, path string, d time.Duration) {
b.treeObjectMoveDuration.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Observe(d.Seconds())
}
func (b *blobovnicza) BlobovniczaTreeRebuildPercent(shardID, path string, value uint32) {
b.treeRebuildPercent.With(prometheus.Labels{
shardIDLabel: shardID,
pathLabel: path,
}).Set(float64(value))
}

View file

@ -45,4 +45,5 @@ const (
failedToDeleteStatus = "failed_to_delete"
deletedStatus = "deleted"
undefinedStatus = "undefined"
)