473 lines
13 KiB
Go
473 lines
13 KiB
Go
package blobovniczatree
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
|
errBatchFull = errors.New("batch full")
|
|
)
|
|
|
|
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
|
|
if b.readOnly {
|
|
return common.RebuildRes{}, common.ErrReadOnly
|
|
}
|
|
|
|
b.metrics.SetRebuildStatus(rebuildStatusRunning)
|
|
success := true
|
|
defer func() {
|
|
if success {
|
|
b.metrics.SetRebuildStatus(rebuildStatusCompleted)
|
|
} else {
|
|
b.metrics.SetRebuildStatus(rebuildStatusFailed)
|
|
}
|
|
}()
|
|
|
|
b.rebuildGuard.Lock()
|
|
defer b.rebuildGuard.Unlock()
|
|
|
|
var res common.RebuildRes
|
|
|
|
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
|
|
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
|
res.ObjectsMoved += completedPreviosMoves
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
|
success = false
|
|
return res, err
|
|
}
|
|
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
|
|
|
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
|
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
|
success = false
|
|
return res, err
|
|
}
|
|
|
|
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
|
|
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
|
|
if err != nil {
|
|
success = false
|
|
}
|
|
return res, err
|
|
}
|
|
|
|
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
|
|
for _, db := range dbs {
|
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
|
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
|
|
res.ObjectsMoved += movedObjects
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
|
return res, err
|
|
}
|
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
|
|
res.FilesRemoved++
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
|
|
dbsToMigrate := make(map[string]struct{})
|
|
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
|
dbsToMigrate[s] = struct{}{}
|
|
return false, nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
|
|
delete(dbsToMigrate, s)
|
|
return false, nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
result := make([]string, 0, len(dbsToMigrate))
|
|
for db := range dbsToMigrate {
|
|
result = append(result, db)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
|
shDB := b.getBlobovnicza(path)
|
|
blz, err := shDB.Open()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
shDBClosed := false
|
|
defer func() {
|
|
if shDBClosed {
|
|
return
|
|
}
|
|
shDB.Close()
|
|
}()
|
|
|
|
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
|
|
if err != nil {
|
|
return migratedObjects, err
|
|
}
|
|
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
|
return migratedObjects, err
|
|
}
|
|
|
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
|
var result atomic.Uint64
|
|
batch := make(map[oid.Address][]byte)
|
|
|
|
var prm blobovnicza.IteratePrm
|
|
prm.DecodeAddresses()
|
|
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
|
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
|
|
if len(batch) == b.blzMoveBatchSize {
|
|
return errBatchFull
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, err := blz.Iterate(ctx, prm)
|
|
if err != nil && !errors.Is(err, errBatchFull) {
|
|
return result.Load(), err
|
|
}
|
|
|
|
if len(batch) == 0 {
|
|
break
|
|
}
|
|
|
|
eg, egCtx := errgroup.WithContext(ctx)
|
|
|
|
for addr, data := range batch {
|
|
addr := addr
|
|
data := data
|
|
|
|
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
|
|
_ = eg.Wait()
|
|
return result.Load(), err
|
|
}
|
|
eg.Go(func() error {
|
|
defer limiter.ReleaseWorkSlot()
|
|
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
|
if err == nil {
|
|
result.Add(1)
|
|
}
|
|
return err
|
|
})
|
|
}
|
|
if err := eg.Wait(); err != nil {
|
|
return result.Load(), err
|
|
}
|
|
|
|
batch = make(map[oid.Address][]byte)
|
|
}
|
|
|
|
return result.Load(), nil
|
|
}
|
|
|
|
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
|
addr oid.Address, data []byte, metaStore common.MetaStorage,
|
|
) error {
|
|
startedAt := time.Now()
|
|
defer func() {
|
|
b.metrics.ObjectMoved(time.Since(startedAt))
|
|
}()
|
|
it := &moveIterator{
|
|
B: b,
|
|
ID: nil,
|
|
AllFull: true,
|
|
Address: addr,
|
|
ObjectData: data,
|
|
MetaStore: metaStore,
|
|
Source: source,
|
|
SourceSysPath: sourcePath,
|
|
}
|
|
|
|
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
|
|
return err
|
|
} else if it.ID == nil {
|
|
if it.AllFull {
|
|
return common.ErrNoSpace
|
|
}
|
|
return errPutFailed
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
|
|
}
|
|
|
|
b.dbCache.EvictAndMarkNonCached(path)
|
|
defer b.dbCache.RemoveFromNonCached(path)
|
|
|
|
b.dbFilesGuard.Lock()
|
|
defer b.dbFilesGuard.Unlock()
|
|
|
|
if err := shDb.CloseAndRemoveFile(); err != nil {
|
|
return false, err
|
|
}
|
|
b.commondbManager.CleanResources(path)
|
|
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
|
|
return true, err
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
|
if path == "." {
|
|
return nil
|
|
}
|
|
|
|
sysPath := filepath.Join(b.rootPath, path)
|
|
entries, err := os.ReadDir(sysPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(entries) > 0 {
|
|
return nil
|
|
}
|
|
if err := os.Remove(sysPath); err != nil {
|
|
return err
|
|
}
|
|
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
|
}
|
|
|
|
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
|
var count uint64
|
|
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
|
shDB := b.getBlobovnicza(s)
|
|
blz, err := shDB.Open()
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
defer shDB.Close()
|
|
|
|
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
|
|
for _, move := range incompletedMoves {
|
|
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
|
return true, err
|
|
}
|
|
count++
|
|
}
|
|
|
|
return false, nil
|
|
})
|
|
}
|
|
|
|
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
|
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
|
|
) error {
|
|
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
|
|
target, err := targetDB.Open()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer targetDB.Close()
|
|
|
|
existsInSource := true
|
|
var gPrm blobovnicza.GetPrm
|
|
gPrm.SetAddress(move.Address)
|
|
gRes, err := source.Get(ctx, gPrm)
|
|
if err != nil {
|
|
if client.IsErrObjectNotFound(err) {
|
|
existsInSource = false
|
|
} else {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !existsInSource { // object was deleted by Rebuild, need to delete move info
|
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
b.deleteProtectedObjects.Delete(move.Address)
|
|
return nil
|
|
}
|
|
|
|
existsInTarget, err := target.Exists(ctx, move.Address)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if !existsInTarget {
|
|
var putPrm blobovnicza.PutPrm
|
|
putPrm.SetAddress(move.Address)
|
|
putPrm.SetMarshaledObject(gRes.Object())
|
|
_, err = target.Put(ctx, putPrm)
|
|
if err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
var deletePrm blobovnicza.DeletePrm
|
|
deletePrm.SetAddress(move.Address)
|
|
if _, err = source.Delete(ctx, deletePrm); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
|
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
b.deleteProtectedObjects.Delete(move.Address)
|
|
return nil
|
|
}
|
|
|
|
type moveIterator struct {
|
|
B *Blobovniczas
|
|
ID *ID
|
|
AllFull bool
|
|
Address oid.Address
|
|
ObjectData []byte
|
|
MetaStore common.MetaStorage
|
|
Source *blobovnicza.Blobovnicza
|
|
SourceSysPath string
|
|
}
|
|
|
|
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
|
|
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
|
if err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
if target == nil {
|
|
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
|
return false, nil
|
|
}
|
|
defer target.Close()
|
|
|
|
i.AllFull = false
|
|
|
|
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
|
|
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
|
|
|
|
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
|
|
Address: i.Address,
|
|
TargetStorageID: targetStorageID.Bytes(),
|
|
}); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
i.B.deleteProtectedObjects.Add(i.Address)
|
|
|
|
var putPrm blobovnicza.PutPrm
|
|
putPrm.SetAddress(i.Address)
|
|
putPrm.SetMarshaledObject(i.ObjectData)
|
|
|
|
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
|
if err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
|
return true, nil
|
|
}
|
|
|
|
var deletePrm blobovnicza.DeletePrm
|
|
deletePrm.SetAddress(i.Address)
|
|
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
|
|
if !isLogical(err) {
|
|
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
|
|
} else {
|
|
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
|
|
}
|
|
return true, nil
|
|
}
|
|
i.B.deleteProtectedObjects.Delete(i.Address)
|
|
|
|
i.ID = targetStorageID
|
|
return true, nil
|
|
}
|
|
|
|
type addressMap struct {
|
|
data map[oid.Address]struct{}
|
|
guard *sync.RWMutex
|
|
}
|
|
|
|
func newAddressMap() *addressMap {
|
|
return &addressMap{
|
|
data: make(map[oid.Address]struct{}),
|
|
guard: &sync.RWMutex{},
|
|
}
|
|
}
|
|
|
|
func (m *addressMap) Add(address oid.Address) {
|
|
m.guard.Lock()
|
|
defer m.guard.Unlock()
|
|
|
|
m.data[address] = struct{}{}
|
|
}
|
|
|
|
func (m *addressMap) Delete(address oid.Address) {
|
|
m.guard.Lock()
|
|
defer m.guard.Unlock()
|
|
|
|
delete(m.data, address)
|
|
}
|
|
|
|
func (m *addressMap) Contains(address oid.Address) bool {
|
|
m.guard.RLock()
|
|
defer m.guard.RUnlock()
|
|
|
|
_, contains := m.data[address]
|
|
return contains
|
|
}
|