Dmitrii Stepanov
5b8200de88
If blobovnicza contains objects larger than object size parameter value, then rebuild fails with an error, because there is no such bucket in database. This commit forces to create bucket on rebuild. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
479 lines
13 KiB
Go
479 lines
13 KiB
Go
package blobovniczatree
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
|
errBatchFull = errors.New("batch full")
|
|
)
|
|
|
|
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
|
|
if b.readOnly {
|
|
return common.RebuildRes{}, common.ErrReadOnly
|
|
}
|
|
|
|
b.metrics.SetRebuildStatus(rebuildStatusRunning)
|
|
b.metrics.SetRebuildPercent(0)
|
|
success := true
|
|
defer func() {
|
|
if success {
|
|
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
|
|
} else {
|
|
b.metrics.SetRebuildStatus(rebuildStatusFailed)
|
|
}
|
|
}()
|
|
|
|
b.rebuildGuard.Lock()
|
|
defer b.rebuildGuard.Unlock()
|
|
|
|
var res common.RebuildRes
|
|
|
|
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
|
|
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
|
res.ObjectsMoved += completedPreviosMoves
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
|
success = false
|
|
return res, err
|
|
}
|
|
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
|
|
|
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
|
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
|
success = false
|
|
return res, err
|
|
}
|
|
|
|
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
|
|
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
|
|
if err != nil {
|
|
success = false
|
|
}
|
|
return res, err
|
|
}
|
|
|
|
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
|
|
var completedDBCount uint32
|
|
for _, db := range dbs {
|
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
|
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
|
|
res.ObjectsMoved += movedObjects
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
|
return res, err
|
|
}
|
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
|
|
res.FilesRemoved++
|
|
completedDBCount++
|
|
b.metrics.SetRebuildPercent((100 * completedDBCount) / uint32(len(dbs)))
|
|
}
|
|
b.metrics.SetRebuildPercent(100)
|
|
return res, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
|
|
dbsToMigrate := make(map[string]struct{})
|
|
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
|
dbsToMigrate[s] = struct{}{}
|
|
return false, nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
|
|
delete(dbsToMigrate, s)
|
|
return false, nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
result := make([]string, 0, len(dbsToMigrate))
|
|
for db := range dbsToMigrate {
|
|
result = append(result, db)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
|
shDB := b.getBlobovnicza(path)
|
|
blz, err := shDB.Open()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
shDBClosed := false
|
|
defer func() {
|
|
if shDBClosed {
|
|
return
|
|
}
|
|
shDB.Close()
|
|
}()
|
|
|
|
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
|
|
if err != nil {
|
|
return migratedObjects, err
|
|
}
|
|
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
|
return migratedObjects, err
|
|
}
|
|
|
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
|
var result atomic.Uint64
|
|
batch := make(map[oid.Address][]byte)
|
|
|
|
var prm blobovnicza.IteratePrm
|
|
prm.DecodeAddresses()
|
|
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
|
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
|
|
if len(batch) == b.blzMoveBatchSize {
|
|
return errBatchFull
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, err := blz.Iterate(ctx, prm)
|
|
if err != nil && !errors.Is(err, errBatchFull) {
|
|
return result.Load(), err
|
|
}
|
|
|
|
if len(batch) == 0 {
|
|
break
|
|
}
|
|
|
|
eg, egCtx := errgroup.WithContext(ctx)
|
|
|
|
for addr, data := range batch {
|
|
addr := addr
|
|
data := data
|
|
|
|
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
|
|
_ = eg.Wait()
|
|
return result.Load(), err
|
|
}
|
|
eg.Go(func() error {
|
|
defer limiter.ReleaseWorkSlot()
|
|
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
|
if err == nil {
|
|
result.Add(1)
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
if err := eg.Wait(); err != nil {
|
|
return result.Load(), err
|
|
}
|
|
|
|
batch = make(map[oid.Address][]byte)
|
|
}
|
|
|
|
return result.Load(), nil
|
|
}
|
|
|
|
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
|
addr oid.Address, data []byte, metaStore common.MetaStorage,
|
|
) error {
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
b.metrics.ObjectMoved(time.Since(startedAt))
|
|
}()
|
|
it := &moveIterator{
|
|
B: b,
|
|
ID: nil,
|
|
AllFull: true,
|
|
Address: addr,
|
|
ObjectData: data,
|
|
MetaStore: metaStore,
|
|
Source: source,
|
|
SourceSysPath: sourcePath,
|
|
}
|
|
|
|
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
|
|
return err
|
|
} else if it.ID == nil {
|
|
if it.AllFull {
|
|
return common.ErrNoSpace
|
|
}
|
|
return errPutFailed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
|
|
}
|
|
|
|
b.dbCache.EvictAndMarkNonCached(path)
|
|
defer b.dbCache.RemoveFromNonCached(path)
|
|
|
|
b.dbFilesGuard.Lock()
|
|
defer b.dbFilesGuard.Unlock()
|
|
|
|
if err := shDb.CloseAndRemoveFile(); err != nil {
|
|
return false, err
|
|
}
|
|
b.commondbManager.CleanResources(path)
|
|
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
|
|
return true, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
|
if path == "." {
|
|
return nil
|
|
}
|
|
|
|
sysPath := filepath.Join(b.rootPath, path)
|
|
entries, err := os.ReadDir(sysPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) > 0 {
|
|
return nil
|
|
}
|
|
if err := os.Remove(sysPath); err != nil {
|
|
return err
|
|
}
|
|
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
|
}
|
|
|
|
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
|
var count uint64
|
|
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
|
shDB := b.getBlobovnicza(s)
|
|
blz, err := shDB.Open()
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
defer shDB.Close()
|
|
|
|
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
|
|
for _, move := range incompletedMoves {
|
|
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
|
return true, err
|
|
}
|
|
count++
|
|
}
|
|
|
|
return false, nil
|
|
})
|
|
}
|
|
|
|
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
|
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
|
|
) error {
|
|
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
|
|
target, err := targetDB.Open()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer targetDB.Close()
|
|
|
|
existsInSource := true
|
|
var gPrm blobovnicza.GetPrm
|
|
gPrm.SetAddress(move.Address)
|
|
gRes, err := source.Get(ctx, gPrm)
|
|
if err != nil {
|
|
if client.IsErrObjectNotFound(err) {
|
|
existsInSource = false
|
|
} else {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !existsInSource { // object was deleted by Rebuild, need to delete move info
|
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
b.deleteProtectedObjects.Delete(move.Address)
|
|
return nil
|
|
}
|
|
|
|
existsInTarget, err := target.Exists(ctx, move.Address)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if !existsInTarget {
|
|
var putPrm blobovnicza.PutPrm
|
|
putPrm.SetAddress(move.Address)
|
|
putPrm.SetMarshaledObject(gRes.Object())
|
|
_, err = target.Put(ctx, putPrm)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", move.Address))
|
|
return err
|
|
}
|
|
|
|
var deletePrm blobovnicza.DeletePrm
|
|
deletePrm.SetAddress(move.Address)
|
|
if _, err = source.Delete(ctx, deletePrm); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
b.deleteProtectedObjects.Delete(move.Address)
|
|
return nil
|
|
}
|
|
|
|
type moveIterator struct {
|
|
B *Blobovniczas
|
|
ID *ID
|
|
AllFull bool
|
|
Address oid.Address
|
|
ObjectData []byte
|
|
MetaStore common.MetaStorage
|
|
Source *blobovnicza.Blobovnicza
|
|
SourceSysPath string
|
|
}
|
|
|
|
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
|
|
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
|
if err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
if target == nil {
|
|
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
|
return false, nil
|
|
}
|
|
defer target.Close()
|
|
|
|
i.AllFull = false
|
|
|
|
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
|
|
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
|
|
|
|
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
|
|
Address: i.Address,
|
|
TargetStorageID: targetStorageID.Bytes(),
|
|
}); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
i.B.deleteProtectedObjects.Add(i.Address)
|
|
|
|
var putPrm blobovnicza.PutPrm
|
|
putPrm.SetAddress(i.Address)
|
|
putPrm.SetMarshaledObject(i.ObjectData)
|
|
putPrm.SetForce(true)
|
|
|
|
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
|
if err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err), zap.Stringer("address", i.Address))
|
|
return true, nil
|
|
}
|
|
|
|
var deletePrm blobovnicza.DeletePrm
|
|
deletePrm.SetAddress(i.Address)
|
|
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
i.B.deleteProtectedObjects.Delete(i.Address)
|
|
|
|
i.ID = targetStorageID
|
|
return true, nil
|
|
}
|
|
|
|
type addressMap struct {
|
|
data map[oid.Address]struct{}
|
|
guard *sync.RWMutex
|
|
}
|
|
|
|
func newAddressMap() *addressMap {
|
|
return &addressMap{
|
|
data: make(map[oid.Address]struct{}),
|
|
guard: &sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
func (m *addressMap) Add(address oid.Address) {
|
|
m.guard.Lock()
|
|
defer m.guard.Unlock()
|
|
|
|
m.data[address] = struct{}{}
|
|
}
|
|
|
|
func (m *addressMap) Delete(address oid.Address) {
|
|
m.guard.Lock()
|
|
defer m.guard.Unlock()
|
|
|
|
delete(m.data, address)
|
|
}
|
|
|
|
func (m *addressMap) Contains(address oid.Address) bool {
|
|
m.guard.RLock()
|
|
defer m.guard.RUnlock()
|
|
|
|
_, contains := m.data[address]
|
|
return contains
|
|
}
|