diff --git a/pkg/local_object_storage/blobovnicza/sizes.go b/pkg/local_object_storage/blobovnicza/sizes.go index 1dff75ae..9bbed0db 100644 --- a/pkg/local_object_storage/blobovnicza/sizes.go +++ b/pkg/local_object_storage/blobovnicza/sizes.go @@ -57,3 +57,7 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) { func (b *Blobovnicza) IsFull() bool { return b.dataSize.Load() >= b.fullSizeLimit } + +func (b *Blobovnicza) FillPercent() int { + return int(100.0 * (float64(b.dataSize.Load()) / float64(b.fullSizeLimit))) +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index 058fe1fb..b7f20822 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "os" "path/filepath" "strings" @@ -59,7 +60,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess) b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild) - dbsToMigrate, err := b.getDBsToRebuild(ctx) + dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.Action) if err != nil { b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err)) success = false @@ -93,7 +94,33 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common. return res, nil } -func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { +func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, action common.RebuildAction) ([]string, error) { + schemaChange := make(map[string]struct{}) + fillPercent := make(map[string]struct{}) + var err error + if action.SchemaChange { + schemaChange, err = b.selectDBsDoNotMatchSchema(ctx) + if err != nil { + return nil, err + } + } + if action.FillPercent { + fillPercent, err = b.selectDBsDoNotMatchFillPercent(ctx, action.FillPercentValue) + if err != nil { + return nil, err + } + } + for k := range fillPercent { + schemaChange[k] = struct{}{} + } + result := make([]string, 0, len(schemaChange)) + for db := range schemaChange { + result = append(result, db) + } + return result, nil +} + +func (b *Blobovniczas) selectDBsDoNotMatchSchema(ctx context.Context) (map[string]struct{}, error) { dbsToMigrate := make(map[string]struct{}) if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) { dbsToMigrate[s] = struct{}{} @@ -107,13 +134,69 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { }); err != nil { return nil, err } - result := make([]string, 0, len(dbsToMigrate)) - for db := range dbsToMigrate { - result = append(result, db) + return dbsToMigrate, nil +} + +func (b *Blobovniczas) selectDBsDoNotMatchFillPercent(ctx context.Context, target int) (map[string]struct{}, error) { + if target <= 0 || target > 100 { + return nil, fmt.Errorf("invalid fill percent value %d: must be (0; 100]", target) + } + result := make(map[string]struct{}) + if err := b.iterateDeepest(ctx, oid.Address{}, func(lvlPath string) (bool, error) { + dir := filepath.Join(b.rootPath, lvlPath) + entries, err := os.ReadDir(dir) + if os.IsNotExist(err) { // non initialized tree + return false, nil + } + if err != nil { + return false, err + } + hasDBs := false + // db with maxIdx could be an active, so it should not be rebuilded + var maxIdx uint64 + for _, e := range entries { + if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) { + continue + } + hasDBs = true + maxIdx = max(u64FromHexString(e.Name()), maxIdx) + } + if !hasDBs { + return false, nil + } + for _, e := range entries { + if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) { + continue + } + if u64FromHexString(e.Name()) == maxIdx { + continue + } + path := filepath.Join(lvlPath, e.Name()) + resettlementRequired, err := b.fillPercentIsLow(path, target) + if err != nil { + return false, err + } + if resettlementRequired { + result[path] = struct{}{} + } + } + return false, nil + }); err != nil { + return nil, err } return result, nil } +func (b *Blobovniczas) fillPercentIsLow(path string, target int) (bool, error) { + shDB := b.getBlobovnicza(path) + blz, err := shDB.Open() + if err != nil { + return false, err + } + defer shDB.Close() + return blz.FillPercent() < target, nil +} + func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { shDB := b.getBlobovnicza(path) blz, err := shDB.Open() diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index 4a51fd86..62ae9ea9 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -15,7 +15,7 @@ import ( "golang.org/x/sync/errgroup" ) -func TestBlobovniczaTreeRebuild(t *testing.T) { +func TestBlobovniczaTreeSchemaRebuild(t *testing.T) { t.Parallel() t.Run("width increased", func(t *testing.T) { @@ -39,6 +39,197 @@ func TestBlobovniczaTreeRebuild(t *testing.T) { }) } +func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) { + t.Parallel() + + t.Run("no rebuild by fill percent", func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + b := NewBlobovniczaTree( + context.Background(), + WithLogger(test.NewLogger(t)), + WithObjectSizeLimit(64*1024), + WithBlobovniczaShallowWidth(1), // single directory + WithBlobovniczaShallowDepth(1), + WithRootPath(dir), + WithBlobovniczaSize(100*1024), // 100 KB limit for each blobovnicza + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) + require.NoError(t, b.Open(mode.ComponentReadWrite)) + require.NoError(t, b.Init()) + + storageIDs := make(map[oid.Address][]byte) + for i := 0; i < 100; i++ { + obj := blobstortest.NewObject(64 * 1024) // 64KB object + data, err := obj.Marshal() + require.NoError(t, err) + var prm common.PutPrm + prm.Address = object.AddressOf(obj) + prm.RawData = data + res, err := b.Put(context.Background(), prm) + require.NoError(t, err) + storageIDs[prm.Address] = res.StorageID + } + metaStub := &storageIDUpdateStub{ + storageIDs: storageIDs, + guard: &sync.Mutex{}, + } + rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ + MetaStorage: metaStub, + WorkerLimiter: &rebuildLimiterStub{}, + Action: common.RebuildAction{ + SchemaChange: false, + FillPercent: true, + FillPercentValue: 60, + }, + }) + require.NoError(t, err) + dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 + require.False(t, dataMigrated) + + for addr, storageID := range storageIDs { + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + require.NoError(t, b.Close()) + }) + + t.Run("no rebuild single db", func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + b := NewBlobovniczaTree( + context.Background(), + WithLogger(test.NewLogger(t)), + WithObjectSizeLimit(64*1024), + WithBlobovniczaShallowWidth(1), // single directory + WithBlobovniczaShallowDepth(1), + WithRootPath(dir), + WithBlobovniczaSize(100*1024), // 100 KB soft limit for each blobovnicza + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) + require.NoError(t, b.Open(mode.ComponentReadWrite)) + require.NoError(t, b.Init()) + + storageIDs := make(map[oid.Address][]byte) + obj := blobstortest.NewObject(64 * 1024) // 64KB object + data, err := obj.Marshal() + require.NoError(t, err) + var prm common.PutPrm + prm.Address = object.AddressOf(obj) + prm.RawData = data + res, err := b.Put(context.Background(), prm) + require.NoError(t, err) + storageIDs[prm.Address] = res.StorageID + metaStub := &storageIDUpdateStub{ + storageIDs: storageIDs, + guard: &sync.Mutex{}, + } + rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ + MetaStorage: metaStub, + WorkerLimiter: &rebuildLimiterStub{}, + Action: common.RebuildAction{ + SchemaChange: false, + FillPercent: true, + FillPercentValue: 90, // 64KB / 100KB = 64% + }, + }) + require.NoError(t, err) + dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 + require.False(t, dataMigrated) + + for addr, storageID := range storageIDs { + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + require.NoError(t, b.Close()) + }) + + t.Run("rebuild by fill percent", func(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + b := NewBlobovniczaTree( + context.Background(), + WithLogger(test.NewLogger(t)), + WithObjectSizeLimit(64*1024), + WithBlobovniczaShallowWidth(1), // single directory + WithBlobovniczaShallowDepth(1), + WithRootPath(dir), + WithBlobovniczaSize(100*1024), // 100 KB limit for each blobovnicza + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) + require.NoError(t, b.Open(mode.ComponentReadWrite)) + require.NoError(t, b.Init()) + + storageIDs := make(map[oid.Address][]byte) + toDelete := make(map[oid.Address][]byte) + for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created + obj := blobstortest.NewObject(64 * 1024) + data, err := obj.Marshal() + require.NoError(t, err) + var prm common.PutPrm + prm.Address = object.AddressOf(obj) + prm.RawData = data + res, err := b.Put(context.Background(), prm) + require.NoError(t, err) + storageIDs[prm.Address] = res.StorageID + if i%2 == 1 { + toDelete[prm.Address] = res.StorageID + } + } + for addr, storageID := range toDelete { + var prm common.DeletePrm + prm.Address = addr + prm.StorageID = storageID + _, err := b.Delete(context.Background(), prm) + require.NoError(t, err) + } + metaStub := &storageIDUpdateStub{ + storageIDs: storageIDs, + guard: &sync.Mutex{}, + } + rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ + MetaStorage: metaStub, + WorkerLimiter: &rebuildLimiterStub{}, + Action: common.RebuildAction{ + SchemaChange: false, + FillPercent: true, + FillPercentValue: 80, + }, + }) + require.NoError(t, err) + require.Equal(t, uint64(49), rRes.FilesRemoved) + require.Equal(t, uint64(49), rRes.ObjectsMoved) // 49 DBs with 1 objects + require.Equal(t, uint64(49), metaStub.updatedCount) + + for addr, storageID := range storageIDs { + if _, found := toDelete[addr]; found { + continue + } + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + require.NoError(t, b.Close()) + }) +} + func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) { t.Parallel() @@ -92,6 +283,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) { var rPrm common.RebuildPrm rPrm.MetaStorage = metaStub rPrm.WorkerLimiter = &rebuildLimiterStub{} + rPrm.Action = common.RebuildAction{SchemaChange: true} rRes, err := b.Rebuild(context.Background(), rPrm) require.NoError(t, err) dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 @@ -180,6 +372,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta var rPrm common.RebuildPrm rPrm.MetaStorage = metaStub rPrm.WorkerLimiter = &rebuildLimiterStub{} + rPrm.Action = common.RebuildAction{SchemaChange: true} rRes, err := b.Rebuild(context.Background(), rPrm) require.NoError(t, err) dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 diff --git a/pkg/local_object_storage/blobstor/common/rebuild.go b/pkg/local_object_storage/blobstor/common/rebuild.go index 9f629ef8..020d9d02 100644 --- a/pkg/local_object_storage/blobstor/common/rebuild.go +++ b/pkg/local_object_storage/blobstor/common/rebuild.go @@ -11,9 +11,17 @@ type RebuildRes struct { FilesRemoved uint64 } +type RebuildAction struct { + SchemaChange bool + + FillPercent bool + FillPercentValue int +} + type RebuildPrm struct { MetaStorage MetaStorage WorkerLimiter ConcurrentWorkersLimiter + Action RebuildAction } type MetaStorage interface { diff --git a/pkg/local_object_storage/blobstor/rebuild.go b/pkg/local_object_storage/blobstor/rebuild.go index 101c6075..31bc2d16 100644 --- a/pkg/local_object_storage/blobstor/rebuild.go +++ b/pkg/local_object_storage/blobstor/rebuild.go @@ -18,13 +18,14 @@ type ConcurrentWorkersLimiter interface { ReleaseWorkSlot() } -func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error { +func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, action common.RebuildAction) error { var summary common.RebuildRes var rErr error for _, storage := range b.storage { res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{ MetaStorage: upd, WorkerLimiter: limiter, + Action: action, }) summary.FilesRemoved += res.FilesRemoved summary.ObjectsMoved += res.ObjectsMoved diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go new file mode 100644 index 00000000..998fcf08 --- /dev/null +++ b/pkg/local_object_storage/shard/rebuild.go @@ -0,0 +1,173 @@ +package shard + +import ( + "context" + "errors" + "sync" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" +) + +type RebuildWorkerLimiter interface { + AcquireWorkSlot(ctx context.Context) error + ReleaseWorkSlot() +} + +type rebuildLimiter struct { + semaphore chan struct{} +} + +func newRebuildLimiter(workersCount uint32) *rebuildLimiter { + return &rebuildLimiter{ + semaphore: make(chan struct{}, workersCount), + } +} + +func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error { + select { + case l.semaphore <- struct{}{}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (l *rebuildLimiter) ReleaseWorkSlot() { + <-l.semaphore +} + +type rebuildTask struct { + limiter RebuildWorkerLimiter + action common.RebuildAction +} + +type rebuilder struct { + mtx *sync.Mutex + wg *sync.WaitGroup + cancel func() + limiter RebuildWorkerLimiter + done chan struct{} + tasks chan rebuildTask +} + +func newRebuilder(l RebuildWorkerLimiter) *rebuilder { + return &rebuilder{ + mtx: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + limiter: l, + tasks: make(chan rebuildTask, 10), + } +} + +func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.done != nil { + return // already started + } + ctx, cancel := context.WithCancel(ctx) + r.cancel = cancel + r.done = make(chan struct{}) + r.wg.Add(1) + go func() { + defer r.wg.Done() + for { + select { + case <-r.done: + return + case t, ok := <-r.tasks: + if !ok { + continue + } + runRebuild(ctx, bs, mb, log, t.action, t.limiter) + } + } + }() + select { + case <-ctx.Done(): + return + case r.tasks <- rebuildTask{ + limiter: r.limiter, + action: common.RebuildAction{ + SchemaChange: true, + }, + }: + return + } +} + +func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger, + action common.RebuildAction, limiter RebuildWorkerLimiter, +) { + select { + case <-ctx.Done(): + return + default: + } + log.Info(logs.BlobstoreRebuildStarted) + if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, action); err != nil { + log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) + } else { + log.Info(logs.BlobstoreRebuildCompletedSuccessfully) + } +} + +func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, action common.RebuildAction, +) error { + select { + case <-ctx.Done(): + return ctx.Err() + case r.tasks <- rebuildTask{ + limiter: limiter, + action: action, + }: + return nil + } +} + +func (r *rebuilder) Stop(log *logger.Logger) { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.done != nil { + close(r.done) + } + if r.cancel != nil { + r.cancel() + } + r.wg.Wait() + r.cancel = nil + r.done = nil + log.Info(logs.BlobstoreRebuildStopped) +} + +var errMBIsNotAvailable = errors.New("metabase is not available") + +type mbStorageIDUpdate struct { + mb *meta.DB +} + +func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if u.mb == nil { + return errMBIsNotAvailable + } + + var prm meta.UpdateStorageIDPrm + prm.SetAddress(addr) + prm.SetStorageID(storageID) + _, err := u.mb.UpdateStorageID(ctx, prm) + return err +} diff --git a/pkg/local_object_storage/shard/rebuild_limiter.go b/pkg/local_object_storage/shard/rebuild_limiter.go deleted file mode 100644 index efc21837..00000000 --- a/pkg/local_object_storage/shard/rebuild_limiter.go +++ /dev/null @@ -1,13 +0,0 @@ -package shard - -import "context" - -type RebuildWorkerLimiter interface { - AcquireWorkSlot(ctx context.Context) error - ReleaseWorkSlot() -} - -type noopRebuildLimiter struct{} - -func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil } -func (l *noopRebuildLimiter) ReleaseWorkSlot() {} diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go deleted file mode 100644 index f18573c5..00000000 --- a/pkg/local_object_storage/shard/rebuilder.go +++ /dev/null @@ -1,98 +0,0 @@ -package shard - -import ( - "context" - "errors" - "sync" - - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" -) - -type rebuilder struct { - mtx *sync.Mutex - wg *sync.WaitGroup - cancel func() - limiter RebuildWorkerLimiter -} - -func newRebuilder(l RebuildWorkerLimiter) *rebuilder { - return &rebuilder{ - mtx: &sync.Mutex{}, - wg: &sync.WaitGroup{}, - cancel: nil, - limiter: l, - } -} - -func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { - r.mtx.Lock() - defer r.mtx.Unlock() - - r.start(ctx, bs, mb, log) -} - -func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { - if r.cancel != nil { - r.stop(log) - } - ctx, cancel := context.WithCancel(ctx) - r.cancel = cancel - r.wg.Add(1) - go func() { - defer r.wg.Done() - - log.Info(logs.BlobstoreRebuildStarted) - if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil { - log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) - } else { - log.Info(logs.BlobstoreRebuildCompletedSuccessfully) - } - }() -} - -func (r *rebuilder) Stop(log *logger.Logger) { - r.mtx.Lock() - defer r.mtx.Unlock() - - r.stop(log) -} - -func (r *rebuilder) stop(log *logger.Logger) { - if r.cancel == nil { - return - } - - r.cancel() - r.wg.Wait() - r.cancel = nil - log.Info(logs.BlobstoreRebuildStopped) -} - -var errMBIsNotAvailable = errors.New("metabase is not available") - -type mbStorageIDUpdate struct { - mb *meta.DB -} - -func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - if u.mb == nil { - return errMBIsNotAvailable - } - - var prm meta.UpdateStorageIDPrm - prm.SetAddress(addr) - prm.SetStorageID(storageID) - _, err := u.mb.UpdateStorageID(ctx, prm) - return err -} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index ac389b50..1eaee881 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -151,7 +151,7 @@ func defaultCfg() *cfg { log: &logger.Logger{Logger: zap.L()}, gcCfg: defaultGCCfg(), reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: &noopRebuildLimiter{}, + rebuildLimiter: newRebuildLimiter(1), zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, zeroCountContainersCallback: func(context.Context, []cid.ID) {}, }