forked from TrueCloudLab/frostfs-node
[#661] blobovniczatree: Make Rebuild failover safe
Now move info stores in blobovnicza, so in case of failover rebuild completes previous operation first. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
da4fee2d0b
commit
b2769ca3de
13 changed files with 615 additions and 35 deletions
|
@ -5,11 +5,13 @@ import (
|
|||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -24,8 +26,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
|||
b.rebuildGuard.Lock()
|
||||
defer b.rebuildGuard.Unlock()
|
||||
|
||||
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||
var res common.RebuildRes
|
||||
|
||||
b.log.Debug(logs.BlobovniczaTreeCompletingPreviousRebuild)
|
||||
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
||||
res.ObjectsMoved += completedPreviosMoves
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
||||
return res, err
|
||||
}
|
||||
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
||||
|
||||
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
||||
|
@ -82,7 +94,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
|||
shDB.Close()
|
||||
}()
|
||||
|
||||
migratedObjects, err := b.moveObjects(ctx, blz, meta)
|
||||
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta)
|
||||
if err != nil {
|
||||
return migratedObjects, err
|
||||
}
|
||||
|
@ -90,13 +102,13 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
|||
return migratedObjects, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) {
|
||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) {
|
||||
var result uint64
|
||||
|
||||
var prm blobovnicza.IteratePrm
|
||||
prm.DecodeAddresses()
|
||||
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
||||
e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta)
|
||||
e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta)
|
||||
if e == nil {
|
||||
result++
|
||||
}
|
||||
|
@ -107,15 +119,28 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
|||
return result, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||
var pPrm common.PutPrm
|
||||
pPrm.Address = addr
|
||||
pPrm.RawData = data
|
||||
pRes, err := b.Put(ctx, pPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||
addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||
it := &moveIterator{
|
||||
B: b,
|
||||
ID: nil,
|
||||
AllFull: true,
|
||||
Address: addr,
|
||||
ObjectData: data,
|
||||
MetaStore: metaStore,
|
||||
Source: source,
|
||||
SourceSysPath: sourcePath,
|
||||
}
|
||||
return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID)
|
||||
|
||||
if err := b.iterateDeepest(ctx, addr, func(lvlPath string) (bool, error) { return it.tryMoveToLvl(ctx, lvlPath) }); err != nil {
|
||||
return err
|
||||
} else if it.ID == nil {
|
||||
if it.AllFull {
|
||||
return common.ErrNoSpace
|
||||
}
|
||||
return errPutFailed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
||||
|
@ -159,3 +184,222 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
|||
}
|
||||
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
||||
var count uint64
|
||||
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
||||
shDB := b.getBlobovnicza(s)
|
||||
blz, err := shDB.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
defer shDB.Close()
|
||||
|
||||
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
|
||||
for _, move := range incompletedMoves {
|
||||
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
||||
return true, err
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||
move blobovnicza.MoveInfo, metaStore common.MetaStorage) error {
|
||||
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
|
||||
target, err := targetDB.Open()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer targetDB.Close()
|
||||
|
||||
existsInSource := true
|
||||
var gPrm blobovnicza.GetPrm
|
||||
gPrm.SetAddress(move.Address)
|
||||
gRes, err := source.Get(ctx, gPrm)
|
||||
if err != nil {
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
existsInSource = false
|
||||
} else {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if !existsInSource { //object was deleted by Rebuild, need to delete move info
|
||||
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.deleteProtectedObjects.Delete(move.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
existsInTarget, err := target.Exists(ctx, move.Address)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotCheckExistenceInTargetDB, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !existsInTarget {
|
||||
var putPrm blobovnicza.PutPrm
|
||||
putPrm.SetAddress(move.Address)
|
||||
putPrm.SetMarshaledObject(gRes.Object())
|
||||
_, err = target.Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToTargetDB, zap.String("path", targetDB.SystemPath()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = metaStore.UpdateStorageID(ctx, move.Address, move.TargetStorageID); err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
var deletePrm blobovnicza.DeletePrm
|
||||
deletePrm.SetAddress(move.Address)
|
||||
if _, err = source.Delete(ctx, deletePrm); err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", sourcePath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
b.deleteProtectedObjects.Delete(move.Address)
|
||||
return nil
|
||||
}
|
||||
|
||||
type moveIterator struct {
|
||||
B *Blobovniczas
|
||||
ID *ID
|
||||
AllFull bool
|
||||
Address oid.Address
|
||||
ObjectData []byte
|
||||
MetaStore common.MetaStorage
|
||||
Source *blobovnicza.Blobovnicza
|
||||
SourceSysPath string
|
||||
}
|
||||
|
||||
func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, error) {
|
||||
target, err := i.B.activeDBManager.GetOpenedActiveDBForLevel(lvlPath)
|
||||
if err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotGetActiveBlobovnicza, zap.Error(err))
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if target == nil {
|
||||
i.B.log.Warn(logs.BlobovniczatreeBlobovniczaOverflowed, zap.String("level", lvlPath))
|
||||
return false, nil
|
||||
}
|
||||
defer target.Close()
|
||||
|
||||
i.AllFull = false
|
||||
|
||||
targetIDx := u64FromHexString(filepath.Base(target.SystemPath()))
|
||||
targetStorageID := NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(targetIDx))))
|
||||
|
||||
if err = i.Source.PutMoveInfo(ctx, blobovnicza.MoveInfo{
|
||||
Address: i.Address,
|
||||
TargetStorageID: targetStorageID.Bytes(),
|
||||
}); err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutMoveInfoToSourceBlobovnicza, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
i.B.deleteProtectedObjects.Add(i.Address)
|
||||
|
||||
var putPrm blobovnicza.PutPrm
|
||||
putPrm.SetAddress(i.Address)
|
||||
putPrm.SetMarshaledObject(i.ObjectData)
|
||||
|
||||
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, zap.String("path", target.SystemPath()), zap.Error(err))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err = i.MetaStore.UpdateStorageID(ctx, i.Address, targetStorageID.Bytes()); err != nil {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotUpdateStorageID, zap.Error(err))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var deletePrm blobovnicza.DeletePrm
|
||||
deletePrm.SetAddress(i.Address)
|
||||
if _, err = i.Source.Delete(ctx, deletePrm); err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotDeleteFromSource, err)
|
||||
} else {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotDeleteFromSource, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if err = i.Source.DropMoveInfo(ctx, i.Address); err != nil {
|
||||
if !isLogical(err) {
|
||||
i.B.reportError(logs.BlobovniczatreeCouldNotDropMoveInfo, err)
|
||||
} else {
|
||||
i.B.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", i.SourceSysPath), zap.Error(err))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
i.B.deleteProtectedObjects.Delete(i.Address)
|
||||
|
||||
i.ID = targetStorageID
|
||||
return true, nil
|
||||
}
|
||||
|
||||
type addressMap struct {
|
||||
data map[oid.Address]struct{}
|
||||
guard *sync.RWMutex
|
||||
}
|
||||
|
||||
func newAddressMap() *addressMap {
|
||||
return &addressMap{
|
||||
data: make(map[oid.Address]struct{}),
|
||||
guard: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *addressMap) Add(address oid.Address) {
|
||||
m.guard.Lock()
|
||||
defer m.guard.Unlock()
|
||||
|
||||
m.data[address] = struct{}{}
|
||||
}
|
||||
|
||||
func (m *addressMap) Delete(address oid.Address) {
|
||||
m.guard.Lock()
|
||||
defer m.guard.Unlock()
|
||||
|
||||
delete(m.data, address)
|
||||
}
|
||||
|
||||
func (m *addressMap) Contains(address oid.Address) bool {
|
||||
m.guard.RLock()
|
||||
defer m.guard.RUnlock()
|
||||
|
||||
_, contains := m.data[address]
|
||||
return contains
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue