[#1337] blobovniczatree: Add .rebuild temp files
This allows to reduce open/close DBs to check incompleted rebuilds. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
a685fcdc96
commit
4eebb43fa6
6 changed files with 71 additions and 9 deletions
|
@ -543,4 +543,5 @@ const (
|
||||||
WritecacheSealCompletedAsync = "writecache seal completed successfully"
|
WritecacheSealCompletedAsync = "writecache seal completed successfully"
|
||||||
FailedToSealWritecacheAsync = "failed to seal writecache async"
|
FailedToSealWritecacheAsync = "failed to seal writecache async"
|
||||||
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
WritecacheShrinkSkippedNotEmpty = "writecache shrink skipped: database is not empty"
|
||||||
|
BlobovniczatreeFailedToRemoveRebuildTempFile = "failed to remove rebuild temp file"
|
||||||
)
|
)
|
||||||
|
|
|
@ -135,7 +135,7 @@ func getBlobovniczaMaxIndex(directory string) (bool, uint64, error) {
|
||||||
var hasDBs bool
|
var hasDBs bool
|
||||||
var maxIdx uint64
|
var maxIdx uint64
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
if e.IsDir() {
|
if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
hasDBs = true
|
hasDBs = true
|
||||||
|
|
|
@ -2,6 +2,7 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
@ -41,10 +42,9 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
|
||||||
|
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
eg.SetLimit(b.blzInitWorkerCount)
|
eg.SetLimit(b.blzInitWorkerCount)
|
||||||
visited := make(map[string]struct{})
|
err = b.iterateIncompletedRebuildDBPaths(egCtx, func(p string) (bool, error) {
|
||||||
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
|
|
||||||
visited[p] = struct{}{}
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
|
p = strings.TrimSuffix(p, rebuildSuffix)
|
||||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||||
blz, err := shBlz.Open()
|
blz, err := shBlz.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -188,11 +188,11 @@ func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string
|
||||||
b.dbFilesGuard.RLock()
|
b.dbFilesGuard.RLock()
|
||||||
defer b.dbFilesGuard.RUnlock()
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
|
||||||
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
|
_, err := b.iterateExistingDBPathsDFS(ctx, "", f, func(path string) bool { return !strings.HasSuffix(path, rebuildSuffix) })
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error), fileFilter func(path string) bool) (bool, error) {
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||||
|
@ -208,7 +208,7 @@ func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path strin
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if entry.IsDir() {
|
if entry.IsDir() {
|
||||||
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f)
|
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f, fileFilter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -216,6 +216,9 @@ func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path strin
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if !fileFilter(entry.Name()) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
stop, err := f(filepath.Join(path, entry.Name()))
|
stop, err := f(filepath.Join(path, entry.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
|
@ -228,6 +231,15 @@ func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path strin
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// iterateIncompletedRebuildDBPaths iterates over the paths of Blobovniczas with incompleted rebuild files without any order.
|
||||||
|
func (b *Blobovniczas) iterateIncompletedRebuildDBPaths(ctx context.Context, f func(string) (bool, error)) error {
|
||||||
|
b.dbFilesGuard.RLock()
|
||||||
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
|
||||||
|
_, err := b.iterateExistingDBPathsDFS(ctx, "", f, func(path string) bool { return strings.HasSuffix(path, rebuildSuffix) })
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
|
||||||
b.dbFilesGuard.RLock()
|
b.dbFilesGuard.RLock()
|
||||||
defer b.dbFilesGuard.RUnlock()
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
@ -249,6 +261,9 @@ func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path st
|
||||||
var dirIdxs []uint64
|
var dirIdxs []uint64
|
||||||
|
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
|
if strings.HasSuffix(entry.Name(), rebuildSuffix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
idx := u64FromHexString(entry.Name())
|
idx := u64FromHexString(entry.Name())
|
||||||
if entry.IsDir() {
|
if entry.IsDir() {
|
||||||
dirIdxs = append(dirIdxs, idx)
|
dirIdxs = append(dirIdxs, idx)
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
@ -19,6 +20,8 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const rebuildSuffix = ".rebuild"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
||||||
errBatchFull = errors.New("batch full")
|
errBatchFull = errors.New("batch full")
|
||||||
|
@ -124,15 +127,36 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
|
||||||
}
|
}
|
||||||
shDB.Close()
|
shDB.Close()
|
||||||
}()
|
}()
|
||||||
|
dropTempFile, err := b.addRebuildTempFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
|
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return migratedObjects, err
|
return migratedObjects, err
|
||||||
}
|
}
|
||||||
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
||||||
|
if err == nil {
|
||||||
|
// drop only on success to continue rebuild on error
|
||||||
|
dropTempFile()
|
||||||
|
}
|
||||||
return migratedObjects, err
|
return migratedObjects, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) addRebuildTempFile(path string) (func(), error) {
|
||||||
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
|
sysPath = sysPath + rebuildSuffix
|
||||||
|
_, err := os.OpenFile(sysPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, b.perm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return func() {
|
||||||
|
if err := os.Remove(sysPath); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
|
||||||
|
}
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
|
||||||
var result atomic.Uint64
|
var result atomic.Uint64
|
||||||
batch := make(map[oid.Address][]byte)
|
batch := make(map[oid.Address][]byte)
|
||||||
|
@ -256,7 +280,10 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
||||||
|
|
||||||
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
||||||
var count uint64
|
var count uint64
|
||||||
return count, b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
var rebuildTempFilesToRemove []string
|
||||||
|
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
|
||||||
|
rebuildTmpFilePath := s
|
||||||
|
s = strings.TrimSuffix(s, rebuildSuffix)
|
||||||
shDB := b.getBlobovnicza(s)
|
shDB := b.getBlobovnicza(s)
|
||||||
blz, err := shDB.Open()
|
blz, err := shDB.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -276,8 +303,15 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rebuildTempFilesToRemove = append(rebuildTempFilesToRemove, rebuildTmpFilePath)
|
||||||
return false, nil
|
return false, nil
|
||||||
})
|
})
|
||||||
|
for _, tmp := range rebuildTempFilesToRemove {
|
||||||
|
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||||
|
|
|
@ -3,6 +3,7 @@ package blobovniczatree
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -53,6 +54,8 @@ func testRebuildFailoverOnlyMoveInfoSaved(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
|
|
||||||
require.NoError(t, blz.Close())
|
require.NoError(t, blz.Close())
|
||||||
|
_, err = os.OpenFile(filepath.Join(dir, "0", "0", "1.db.rebuild"), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, defaultPerm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
testRebuildFailoverValidate(t, dir, obj, true)
|
testRebuildFailoverValidate(t, dir, obj, true)
|
||||||
}
|
}
|
||||||
|
@ -82,6 +85,9 @@ func testRebuildFailoverObjectSavedToTarget(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, blz.Close())
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
_, err = os.OpenFile(filepath.Join(dir, "0", "0", "1.db.rebuild"), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, defaultPerm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
||||||
require.NoError(t, blz.Open())
|
require.NoError(t, blz.Open())
|
||||||
require.NoError(t, blz.Init())
|
require.NoError(t, blz.Init())
|
||||||
|
@ -113,6 +119,9 @@ func testRebuildFailoverObjectDeletedFromSource(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, blz.Close())
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
_, err = os.OpenFile(filepath.Join(dir, "0", "0", "1.db.rebuild"), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, defaultPerm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
blz = blobovnicza.New(blobovnicza.WithPath(filepath.Join(dir, "0", "0", "0.db")))
|
||||||
require.NoError(t, blz.Open())
|
require.NoError(t, blz.Open())
|
||||||
require.NoError(t, blz.Init())
|
require.NoError(t, blz.Init())
|
||||||
|
@ -194,4 +203,7 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
require.NoError(t, blz.Close())
|
require.NoError(t, blz.Close())
|
||||||
|
|
||||||
|
_, err = os.Stat(filepath.Join(dir, "0", "0", "1.db.rebuild"))
|
||||||
|
require.True(t, os.IsNotExist(err))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue