package blobovniczatree import ( "bytes" "context" "errors" "fmt" "os" "path/filepath" "strings" "sync" "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) const rebuildSuffix = ".rebuild" var ( errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") errBatchFull = errors.New("batch full") ) func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) { if b.readOnly { return common.RebuildRes{}, common.ErrReadOnly } b.metrics.SetRebuildStatus(rebuildStatusRunning) b.metrics.SetRebuildPercent(0) success := true defer func() { if success { b.metrics.SetRebuildStatus(rebuildStatusCompleted) } else { b.metrics.SetRebuildStatus(rebuildStatusFailed) } }() b.rebuildGuard.Lock() defer b.rebuildGuard.Unlock() var res common.RebuildRes b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild) completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage) res.ObjectsMoved += completedPreviosMoves if err != nil { b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err)) success = false return res, err } b.log.Debug(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildSuccess) b.log.Debug(ctx, logs.BlobovniczaTreeCollectingDBToRebuild) dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.FillPercent) if err != nil { b.log.Warn(ctx, logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err)) success = false return res, err } b.log.Info(ctx, logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate))) res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res) if err != nil { success = false } return res, err } func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) { var completedDBCount uint32 for _, db := range dbs { b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter) res.ObjectsMoved += movedObjects if err != nil { b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) return res, err } b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) res.FilesRemoved++ completedDBCount++ b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs))) } b.metrics.SetRebuildPercent(100) return res, nil } func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, fillPercent int) ([]string, error) { withSchemaChange, err := b.selectDBsDoNotMatchSchema(ctx) if err != nil { return nil, err } withFillPercent, err := b.selectDBsDoNotMatchFillPercent(ctx, fillPercent) if err != nil { return nil, err } for k := range withFillPercent { withSchemaChange[k] = struct{}{} } result := make([]string, 0, len(withSchemaChange)) for db := range withSchemaChange { result = append(result, db) } return result, nil } func (b *Blobovniczas) selectDBsDoNotMatchSchema(ctx context.Context) (map[string]struct{}, error) { dbsToMigrate := make(map[string]struct{}) if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) { dbsToMigrate[s] = struct{}{} return false, nil }); err != nil { return nil, err } if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) { delete(dbsToMigrate, s) return false, nil }); err != nil { return nil, err } return dbsToMigrate, nil } func (b *Blobovniczas) selectDBsDoNotMatchFillPercent(ctx context.Context, target int) (map[string]struct{}, error) { if target <= 0 || target > 100 { return nil, fmt.Errorf("invalid fill percent value %d: must be (0; 100]", target) } result := make(map[string]struct{}) if err := b.iterateDeepest(ctx, oid.Address{}, func(lvlPath string) (bool, error) { dir := filepath.Join(b.rootPath, lvlPath) entries, err := os.ReadDir(dir) if os.IsNotExist(err) { // non initialized tree return false, nil } if err != nil { return false, err } hasDBs := false // db with maxIdx could be an active, so it should not be rebuilded var maxIdx uint64 for _, e := range entries { if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) { continue } hasDBs = true maxIdx = max(u64FromHexString(e.Name()), maxIdx) } if !hasDBs { return false, nil } for _, e := range entries { if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) { continue } if u64FromHexString(e.Name()) == maxIdx { continue } path := filepath.Join(lvlPath, e.Name()) resettlementRequired, err := b.rebuildBySize(ctx, path, target) if err != nil { return false, err } if resettlementRequired { result[path] = struct{}{} } } return false, nil }); err != nil { return nil, err } return result, nil } func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFillPercent int) (bool, error) { shDB := b.getBlobovnicza(ctx, path) blz, err := shDB.Open(ctx) if err != nil { return false, err } defer shDB.Close() fp := blz.FillPercent() // accepted fill percent defines as // |----|+++++++++++++++++|+++++++++++++++++|--------------- // 0% target 100% 100+(100 - target) // where `+` - accepted fill percent, `-` - not accepted fill percent return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil } func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { shDB := b.getBlobovnicza(ctx, path) blz, err := shDB.Open(ctx) if err != nil { return 0, err } shDBClosed := false defer func() { if shDBClosed { return } shDB.Close() }() dropTempFile, err := b.addRebuildTempFile(path) if err != nil { return 0, err } migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter) if err != nil { return migratedObjects, err } shDBClosed, err = b.dropDB(ctx, path, shDB) if err == nil { // drop only on success to continue rebuild on error dropTempFile() } return migratedObjects, err } func (b *Blobovniczas) addRebuildTempFile(path string) (func(), error) { sysPath := filepath.Join(b.rootPath, path) sysPath = sysPath + rebuildSuffix _, err := os.OpenFile(sysPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, b.perm) if err != nil { return nil, err } return func() { if err := os.Remove(sysPath); err != nil { b.log.Warn(context.Background(), logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err)) } }, nil } func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { var result atomic.Uint64 batch := make(map[oid.Address][]byte) var prm blobovnicza.IteratePrm prm.DecodeAddresses() prm.SetHandler(func(ie blobovnicza.IterationElement) error { batch[ie.Address()] = bytes.Clone(ie.ObjectData()) if len(batch) == b.blzMoveBatchSize { return errBatchFull } return nil }) for { _, err := blz.Iterate(ctx, prm) if err != nil && !errors.Is(err, errBatchFull) { return result.Load(), err } if len(batch) == 0 { break } eg, egCtx := errgroup.WithContext(ctx) for addr, data := range batch { if err := limiter.AcquireWorkSlot(egCtx); err != nil { _ = eg.Wait() return result.Load(), err } eg.Go(func() error { defer limiter.ReleaseWorkSlot() err := b.moveObject(egCtx, blz, blzPath, addr, data, meta) if err == nil { result.Add(1) } return err }) } if err := eg.Wait(); err != nil { return result.Load(), err } batch = make(map[oid.Address][]byte) } return result.Load(), nil } func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, addr oid.Address, data []byte, metaStore common.MetaStorage, ) error { startedAt := time.Now() defer func() { b.metrics.ObjectMoved(time.Since(startedAt)) }() it := &moveIterator{ B: b, ID: nil, AllFull: true, Address: addr, ObjectData: data, MetaStore: metaStore, Source: source, SourceSysPath: sourcePath, } if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil { return err } else if it.ID == nil { if it.AllFull { return common.ErrNoSpace } return errPutFailed } return nil } func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID } b.dbCache.EvictAndMarkNonCached(path) defer b.dbCache.RemoveFromNonCached(path) b.dbFilesGuard.Lock() defer b.dbFilesGuard.Unlock() if err := shDb.CloseAndRemoveFile(); err != nil { return false, err } b.commondbManager.CleanResources(path) if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil { return true, err } return true, nil } func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error { if path == "." { return nil } sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if err != nil { return err } if len(entries) > 0 { return nil } if err := os.Remove(sysPath); err != nil { return err } return b.dropDirectoryIfEmpty(filepath.Dir(path)) } func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) { var count uint64 var rebuildTempFilesToRemove []string err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) { rebuildTmpFilePath := s s = strings.TrimSuffix(s, rebuildSuffix) shDB := b.getBlobovnicza(ctx, s) blz, err := shDB.Open(ctx) if err != nil { return true, err } defer shDB.Close() incompletedMoves, err := blz.ListMoveInfo(ctx) if err != nil { return true, err } for _, move := range incompletedMoves { if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil { return true, err } count++ } rebuildTempFilesToRemove = append(rebuildTempFilesToRemove, rebuildTmpFilePath) return false, nil }) for _, tmp := range rebuildTempFilesToRemove { if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err)) } } return count, err } func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, move blobovnicza.MoveInfo, metaStore common.MetaStorage, ) error { targetDB := b.getBlobovnicza(ctx, NewIDFromBytes(move.TargetStorageID).Path()) target, err := targetDB.Open(ctx) if err != nil { return err } defer targetDB.Close() existsInSource := true var gPrm blobovnicza.GetPrm gPrm.SetAddress(move.Address) gRes, err := source.Get(ctx, gPrm) if err != nil { if client.IsErrObjectNotFound(err) { existsInSource = false } else { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err)) return err } } if !existsInSource { // object was deleted by Rebuild, need to delete move info if err = source.DropMoveInfo(ctx, move.Address); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err)) return err } b.deleteProtectedObjects.Delete(move.Address) return nil } existsInTarget, err := target.Exists(ctx, move.Address) if err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err)) return err } if !existsInTarget { var putPrm blobovnicza.PutPrm putPrm.SetAddress(move.Address) putPrm.SetMarshaledObject(gRes.Object()) _, err = target.Put(ctx, putPrm) if err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err)) return err } } if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address)) return err } var deletePrm blobovnicza.DeletePrm deletePrm.SetAddress(move.Address) if _, err = source.Delete(ctx, deletePrm); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err)) return err } if err = source.DropMoveInfo(ctx, move.Address); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err)) return err } b.deleteProtectedObjects.Delete(move.Address) return nil } type moveIterator struct { B *Blobovniczas ID *ID AllFull bool Address oid.Address ObjectData []byte MetaStore common.MetaStorage Source *blobovnicza.Blobovnicza SourceSysPath string } func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) { target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(ctx, lvlPath) if err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err) } else { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err)) } return false, nil } if target == nil { i.B.log.Warn(ctx, logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath)) return false, nil } defer target.Close() i.AllFull = false targetIDx := u64FromHexString(filepath.Base(target.SystemPath())) targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx)))) if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{ Address: i.Address, TargetStorageID: targetStorageID.Bytes(), }); err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err) } else { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err)) } return true, nil } i.B.deleteProtectedObjects.Add(i.Address) var putPrm blobovnicza.PutPrm putPrm.SetAddress(i.Address) putPrm.SetMarshaledObject(i.ObjectData) putPrm.SetForce(true) _, err = target.Blobovnicza().Put(ctx, putPrm) if err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) } else { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err)) } return true, nil } if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address)) return true, nil } var deletePrm blobovnicza.DeletePrm deletePrm.SetAddress(i.Address) if _, err = i.Source.Delete(ctx, deletePrm); err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err) } else { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err)) } return true, nil } if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil { if !isLogical(err) { i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err) } else { i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err)) } return true, nil } i.B.deleteProtectedObjects.Delete(i.Address) i.ID = targetStorageID return true, nil } type addressMap struct { data map[oid.Address]struct{} guard *sync.RWMutex } func newAddressMap() *addressMap { return &addressMap{ data: make(map[oid.Address]struct{}), guard: &sync.RWMutex{}, } } func (m *addressMap) Add(address oid.Address) { m.guard.Lock() defer m.guard.Unlock() m.data[address] = struct{}{} } func (m *addressMap) Delete(address oid.Address) { m.guard.Lock() defer m.guard.Unlock() delete(m.data, address) } func (m *addressMap) Contains(address oid.Address) bool { m.guard.RLock() defer m.guard.RUnlock() _, contains := m.data[address] return contains }