frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go
Dmitrii Stepanov 08182bd6f1
[#1437] node: Use ctx for logging
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-13 10:16:56 +03:00

591 lines
16 KiB
Go

package blobovniczatree
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const rebuildSuffix = ".rebuild"
var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly {
return common.RebuildRes{}, common.ErrReadOnly
}
b.metrics.SetRebuildStatus(rebuildStatusRunning)
b.metrics.SetRebuildPercent(0)
success := true
defer func() {
if success {
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
} else {
b.metrics.SetRebuildStatus(rebuildStatusFailed)
}
}()
b.rebuildGuard.Lock()
defer b.rebuildGuard.Unlock()
var res common.RebuildRes
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
res.ObjectsMoved += completedPreviosMoves
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Debug(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(ctx, logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.FillPercent)
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false
return res, err
}
b.log.Info(ctx, logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
if err != nil {
success = false
}
return res, err
}
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err
}
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
res.FilesRemoved++
completedDBCount++
b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs)))
}
b.metrics.SetRebuildPercent(100)
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, fillPercent int) ([]string, error) {
withSchemaChange, err := b.selectDBsDoNotMatchSchema(ctx)
if err != nil {
return nil, err
}
withFillPercent, err := b.selectDBsDoNotMatchFillPercent(ctx, fillPercent)
if err != nil {
return nil, err
}
for k := range withFillPercent {
withSchemaChange[k] = struct{}{}
}
result := make([]string, 0, len(withSchemaChange))
for db := range withSchemaChange {
result = append(result, db)
}
return result, nil
}
func (b *Blobovniczas) selectDBsDoNotMatchSchema(ctx context.Context) (map[string]struct{}, error) {
dbsToMigrate := make(map[string]struct{})
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
dbsToMigrate[s] = struct{}{}
return false, nil
}); err != nil {
return nil, err
}
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
delete(dbsToMigrate, s)
return false, nil
}); err != nil {
return nil, err
}
return dbsToMigrate, nil
}
func (b *Blobovniczas) selectDBsDoNotMatchFillPercent(ctx context.Context, target int) (map[string]struct{}, error) {
if target <= 0 || target > 100 {
return nil, fmt.Errorf("invalid fill percent value %d: must be (0; 100]", target)
}
result := make(map[string]struct{})
if err := b.iterateDeepest(ctx, oid.Address{}, func(lvlPath string) (bool, error) {
dir := filepath.Join(b.rootPath, lvlPath)
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) { // non initialized tree
return false, nil
}
if err != nil {
return false, err
}
hasDBs := false
// db with maxIdx could be an active, so it should not be rebuilded
var maxIdx uint64
for _, e := range entries {
if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) {
continue
}
hasDBs = true
maxIdx = max(u64FromHexString(e.Name()), maxIdx)
}
if !hasDBs {
return false, nil
}
for _, e := range entries {
if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) {
continue
}
if u64FromHexString(e.Name()) == maxIdx {
continue
}
path := filepath.Join(lvlPath, e.Name())
resettlementRequired, err := b.rebuildBySize(path, target)
if err != nil {
return false, err
}
if resettlementRequired {
result[path] = struct{}{}
}
}
return false, nil
}); err != nil {
return nil, err
}
return result, nil
}
func (b *Blobovniczas) rebuildBySize(path string, targetFillPercent int) (bool, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return false, err
}
defer shDB.Close()
fp := blz.FillPercent()
// accepted fill percent defines as
// |----|+++++++++++++++++|+++++++++++++++++|---------------
// 0% target 100% 100+(100 - target)
// where `+` - accepted fill percent, `-` - not accepted fill percent
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return 0, err
}
shDBClosed := false
defer func() {
if shDBClosed {
return
}
shDB.Close()
}()
dropTempFile, err := b.addRebuildTempFile(path)
if err != nil {
return 0, err
}
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
shDBClosed, err = b.dropDB(ctx, path, shDB)
if err == nil {
// drop only on success to continue rebuild on error
dropTempFile()
}
return migratedObjects, err
}
func (b *Blobovniczas) addRebuildTempFile(path string) (func(), error) {
sysPath := filepath.Join(b.rootPath, path)
sysPath = sysPath + rebuildSuffix
_, err := os.OpenFile(sysPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, b.perm)
if err != nil {
return nil, err
}
return func() {
if err := os.Remove(sysPath); err != nil {
b.log.Warn(context.Background(), logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
}
}, nil
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm
prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if len(batch) == b.blzMoveBatchSize {
return errBatchFull
}
return nil
})
for {
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
}
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
addr oid.Address, data []byte, metaStore common.MetaStorage,
) error {
startedAt := time.Now()
defer func() {
b.metrics.ObjectMoved(time.Since(startedAt))
}()
it := &moveIterator{
B: b,
ID: nil,
AllFull: true,
Address: addr,
ObjectData: data,
MetaStore: metaStore,
Source: source,
SourceSysPath: sourcePath,
}
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
return err
} else if it.ID == nil {
if it.AllFull {
return common.ErrNoSpace
}
return errPutFailed
}
return nil
}
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
}
b.dbCache.EvictAndMarkNonCached(path)
defer b.dbCache.RemoveFromNonCached(path)
b.dbFilesGuard.Lock()
defer b.dbFilesGuard.Unlock()
if err := shDb.CloseAndRemoveFile(); err != nil {
return false, err
}
b.commondbManager.CleanResources(path)
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
return true, err
}
return true, nil
}
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
if path == "." {
return nil
}
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if err != nil {
return err
}
if len(entries) > 0 {
return nil
}
if err := os.Remove(sysPath); err != nil {
return err
}
return b.dropDirectoryIfEmpty(filepath.Dir(path))
}
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
var count uint64
var rebuildTempFilesToRemove []string
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
rebuildTmpFilePath := s
s = strings.TrimSuffix(s, rebuildSuffix)
shDB := b.getBlobovnicza(s)
blz, err := shDB.Open()
if err != nil {
return true, err
}
defer shDB.Close()
incompletedMoves, err := blz.ListMoveInfo(ctx)
if err != nil {
return true, err
}
for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
return true, err
}
count++
}
rebuildTempFilesToRemove = append(rebuildTempFilesToRemove, rebuildTmpFilePath)
return false, nil
})
for _, tmp := range rebuildTempFilesToRemove {
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
}
}
return count, err
}
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
) error {
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
target, err := targetDB.Open()
if err != nil {
return err
}
defer targetDB.Close()
existsInSource := true
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(move.Address)
gRes, err := source.Get(ctx, gPrm)
if err != nil {
if client.IsErrObjectNotFound(err) {
existsInSource = false
} else {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
}
if !existsInSource { // object was deleted by Rebuild, need to delete move info
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
existsInTarget, err := target.Exists(ctx, move.Address)
if err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
return err
}
if !existsInTarget {
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(move.Address)
putPrm.SetMarshaledObject(gRes.Object())
_, err = target.Put(ctx, putPrm)
if err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
return err
}
}
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address))
return err
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(move.Address)
if _, err = source.Delete(ctx, deletePrm); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
return err
}
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
return err
}
b.deleteProtectedObjects.Delete(move.Address)
return nil
}
type moveIterator struct {
B *Blobovniczas
ID *ID
AllFull bool
Address oid.Address
ObjectData []byte
MetaStore common.MetaStorage
Source *blobovnicza.Blobovnicza
SourceSysPath string
}
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
} else {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
}
return false, nil
}
if target == nil {
i.B.log.Warn(ctx, logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
return false, nil
}
defer target.Close()
i.AllFull = false
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
Address: i.Address,
TargetStorageID: targetStorageID.Bytes(),
}); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
} else {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Add(i.Address)
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(i.Address)
putPrm.SetMarshaledObject(i.ObjectData)
putPrm.SetForce(true)
_, err = target.Blobovnicza().Put(ctx, putPrm)
if err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
} else {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
}
return true, nil
}
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address))
return true, nil
}
var deletePrm blobovnicza.DeletePrm
deletePrm.SetAddress(i.Address)
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
} else {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
if !isLogical(err) {
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
} else {
i.B.log.Warn(ctx, logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
}
return true, nil
}
i.B.deleteProtectedObjects.Delete(i.Address)
i.ID = targetStorageID
return true, nil
}
type addressMap struct {
data map[oid.Address]struct{}
guard *sync.RWMutex
}
func newAddressMap() *addressMap {
return &addressMap{
data: make(map[oid.Address]struct{}),
guard: &sync.RWMutex{},
}
}
func (m *addressMap) Add(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
m.data[address] = struct{}{}
}
func (m *addressMap) Delete(address oid.Address) {
m.guard.Lock()
defer m.guard.Unlock()
delete(m.data, address)
}
func (m *addressMap) Contains(address oid.Address) bool {
m.guard.RLock()
defer m.guard.RUnlock()
_, contains := m.data[address]
return contains
}