From 422226da183d762ae86b285a6ad9ab3825a92d83 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 22 Sep 2023 13:07:32 +0300 Subject: [PATCH] [#661] blobovniczatree: Add Rebuild implementation Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 6 + .../blobstor/blobovniczatree/active.go | 6 +- .../blobstor/blobovniczatree/blobovnicza.go | 5 + .../blobstor/blobovniczatree/cache.go | 29 +++- .../blobstor/blobovniczatree/delete.go | 6 + .../blobstor/blobovniczatree/iterate.go | 10 ++ .../blobstor/blobovniczatree/manager.go | 69 +++++++- .../blobstor/blobovniczatree/option.go | 36 ++-- .../blobstor/blobovniczatree/put.go | 4 +- .../blobstor/blobovniczatree/rebuild.go | 154 +++++++++++++++++- .../blobstor/blobovniczatree/rebuild_test.go | 132 +++++++++++++++ .../blobstor/common/rebuild.go | 4 +- pkg/local_object_storage/blobstor/rebuild.go | 4 +- pkg/local_object_storage/shard/rebuilder.go | 17 +- 14 files changed, 443 insertions(+), 39 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c1afdadf..6a466720 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -531,4 +531,10 @@ const ( BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension" BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages" BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed" + BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..." + BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed" + BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully" + BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..." + BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed" + BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully" ) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/active.go b/pkg/local_object_storage/blobstor/blobovniczatree/active.go index da888064..0e349738 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/active.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/active.go @@ -21,8 +21,8 @@ func (db *activeDB) Close() { db.shDB.Close() } -func (db *activeDB) Path() string { - return db.shDB.Path() +func (db *activeDB) SystemPath() string { + return db.shDB.SystemPath() } // activeDBManager manages active blobovnicza instances (that is, those that are being used for Put). @@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) { if !ok { return false, 0 } - return true, u64FromHexString(filepath.Base(db.Path())) + return true, u64FromHexString(filepath.Base(db.SystemPath())) } func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) { diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go index 14acec60..0354fa2a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/blobovnicza.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" @@ -58,6 +59,8 @@ type Blobovniczas struct { commondbManager *dbManager activeDBManager *activeDBManager dbCache *dbCache + dbFilesGuard *sync.RWMutex + rebuildGuard *sync.RWMutex } var _ common.Storage = (*Blobovniczas)(nil) @@ -84,6 +87,8 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) { blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log) blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth) blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager) + blz.dbFilesGuard = &sync.RWMutex{} + blz.rebuildGuard = &sync.RWMutex{} return blz } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/cache.go b/pkg/local_object_storage/blobstor/blobovniczatree/cache.go index 3f62c49f..317bac5f 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/cache.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/cache.go @@ -15,8 +15,9 @@ import ( type dbCache struct { cacheGuard *sync.RWMutex cache simplelru.LRUCache[string, *sharedDB] - pathLock *utilSync.KeyLocker[string] + pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second closed bool + nonCached map[string]struct{} dbManager *dbManager } @@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache { cache: cache, dbManager: dbManager, pathLock: utilSync.NewKeyLocker[string](), + nonCached: make(map[string]struct{}), } } @@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB { return c.create(path) } +func (c *dbCache) EvictAndMarkNonCached(path string) { + c.pathLock.Lock(path) + defer c.pathLock.Unlock(path) + + c.cacheGuard.Lock() + defer c.cacheGuard.Unlock() + + c.cache.Remove(path) + c.nonCached[path] = struct{}{} +} + +func (c *dbCache) RemoveFromNonCached(path string) { + c.pathLock.Lock(path) + defer c.pathLock.Unlock(path) + + c.cacheGuard.Lock() + defer c.cacheGuard.Unlock() + + delete(c.nonCached, path) +} + func (c *dbCache) getExisted(path string) *sharedDB { c.cacheGuard.Lock() defer c.cacheGuard.Unlock() @@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool { c.cacheGuard.Lock() defer c.cacheGuard.Unlock() - if !c.closed { + _, isNonCached := c.nonCached[path] + + if !isNonCached && !c.closed { c.cache.Add(path, db) return true } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go index 21c21beb..56803c87 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go @@ -43,6 +43,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co return common.DeleteRes{}, common.ErrReadOnly } + if b.rebuildGuard.TryRLock() { + defer b.rebuildGuard.RUnlock() + } else { + return common.DeleteRes{}, errRebuildInProgress + } + var bPrm blobovnicza.DeletePrm bPrm.SetAddress(prm.Address) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index 0522f891..73678795 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -181,6 +181,11 @@ func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string } func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) { + if path == "" { + b.dbFilesGuard.RLock() + defer b.dbFilesGuard.RUnlock() + } + sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode @@ -222,6 +227,11 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres } func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) { + if path == "" { + b.dbFilesGuard.RLock() + defer b.dbFilesGuard.RUnlock() + } + sysPath := filepath.Join(b.rootPath, path) entries, err := os.ReadDir(sysPath) if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/manager.go b/pkg/local_object_storage/blobstor/blobovniczatree/manager.go index 4dca13ad..4fdde15a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/manager.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/manager.go @@ -1,7 +1,9 @@ package blobovniczatree import ( + "errors" "fmt" + "os" "path/filepath" "sync" "sync/atomic" @@ -12,9 +14,11 @@ import ( "go.uber.org/zap" ) +var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed") + // sharedDB is responsible for opening and closing a file of single blobovnicza. type sharedDB struct { - guard *sync.RWMutex + cond *sync.Cond blcza *blobovnicza.Blobovnicza refCount uint32 @@ -31,8 +35,9 @@ func newSharedDB(options []blobovnicza.Option, path string, readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger, ) *sharedDB { return &sharedDB{ - guard: &sync.RWMutex{}, - + cond: &sync.Cond{ + L: &sync.RWMutex{}, + }, options: options, path: path, readOnly: readOnly, @@ -48,8 +53,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) { return nil, errClosed } - b.guard.Lock() - defer b.guard.Unlock() + b.cond.L.Lock() + defer b.cond.L.Unlock() if b.refCount > 0 { b.refCount++ @@ -77,11 +82,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) { } func (b *sharedDB) Close() { - b.guard.Lock() - defer b.guard.Unlock() + b.cond.L.Lock() + defer b.cond.L.Unlock() if b.refCount == 0 { b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path)) + b.cond.Broadcast() return } @@ -99,9 +105,38 @@ func (b *sharedDB) Close() { } b.refCount-- + if b.refCount == 1 { + b.cond.Broadcast() + } } -func (b *sharedDB) Path() string { +func (b *sharedDB) CloseAndRemoveFile() error { + b.cond.L.Lock() + if b.refCount > 1 { + b.cond.Wait() + } + defer b.cond.L.Unlock() + + if b.refCount == 0 { + return errClosingClosedBlobovnicza + } + + if err := b.blcza.Close(); err != nil { + b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza, + zap.String("id", b.path), + zap.String("error", err.Error()), + ) + return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err) + } + + b.refCount = 0 + b.blcza = nil + b.openDBCounter.Dec() + + return os.Remove(b.path) +} + +func (b *sharedDB) SystemPath() string { return b.path } @@ -166,6 +201,13 @@ func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB { return db } +func (m *levelDbManager) hasAnyDB() bool { + m.dbMtx.RLock() + defer m.dbMtx.RUnlock() + + return len(m.databases) > 0 +} + // dbManager manages the opening and closing of blobovnicza instances. // // The blobovnicza opens at the first request, closes after the last request. @@ -203,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB { return levelManager.GetByIndex(curIndex) } +func (m *dbManager) CleanResources(path string) { + lvlPath := filepath.Dir(path) + + m.levelToManagerGuard.Lock() + defer m.levelToManagerGuard.Unlock() + + if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() { + delete(m.levelToManager, lvlPath) + } +} + func (m *dbManager) Open() { m.closedFlag.Store(false) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go index 561b8376..54c52118 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go @@ -2,6 +2,7 @@ package blobovniczatree import ( "io/fs" + "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression" @@ -21,28 +22,31 @@ type cfg struct { compression *compression.Config blzOpts []blobovnicza.Option // reportError is the function called when encountering disk errors. - reportError func(string, error) - metrics Metrics + reportError func(string, error) + metrics Metrics + waitBeforeDropDB time.Duration } type Option func(*cfg) const ( - defaultPerm = 0o700 - defaultOpenedCacheSize = 50 - defaultBlzShallowDepth = 2 - defaultBlzShallowWidth = 16 + defaultPerm = 0o700 + defaultOpenedCacheSize = 50 + defaultBlzShallowDepth = 2 + defaultBlzShallowWidth = 16 + defaultWaitBeforeDropDB = 10 * time.Second ) func initConfig(c *cfg) { *c = cfg{ - log: &logger.Logger{Logger: zap.L()}, - perm: defaultPerm, - openedCacheSize: defaultOpenedCacheSize, - blzShallowDepth: defaultBlzShallowDepth, - blzShallowWidth: defaultBlzShallowWidth, - reportError: func(string, error) {}, - metrics: &noopMetrics{}, + log: &logger.Logger{Logger: zap.L()}, + perm: defaultPerm, + openedCacheSize: defaultOpenedCacheSize, + blzShallowDepth: defaultBlzShallowDepth, + blzShallowWidth: defaultBlzShallowWidth, + reportError: func(string, error) {}, + metrics: &noopMetrics{}, + waitBeforeDropDB: defaultWaitBeforeDropDB, } } @@ -106,3 +110,9 @@ func WithMetrics(m Metrics) Option { c.metrics = m } } + +func WithWaitBeforeDropDB(t time.Duration) Option { + return func(c *cfg) { + c.waitBeforeDropDB = t + } +} diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/put.go b/pkg/local_object_storage/blobstor/blobovniczatree/put.go index 8c8697c2..96c4db67 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/put.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/put.go @@ -104,7 +104,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err) } else { i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, - zap.String("path", active.Path()), + zap.String("path", active.SystemPath()), zap.String("error", err.Error()), zap.String("trace_id", tracingPkg.GetTraceID(ctx))) } @@ -112,7 +112,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error) return false, nil } - idx := u64FromHexString(filepath.Base(active.Path())) + idx := u64FromHexString(filepath.Base(active.SystemPath())) i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx)))) return true, nil diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index 27c645ab..35e001b6 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -2,10 +2,160 @@ package blobovniczatree import ( "context" + "errors" + "os" + "path/filepath" + "time" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.uber.org/zap" ) -func (b *Blobovniczas) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) { - return common.RebuildRes{}, nil +var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") + +func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) { + if b.readOnly { + return common.RebuildRes{}, common.ErrReadOnly + } + + b.rebuildGuard.Lock() + defer b.rebuildGuard.Unlock() + + b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild) + var res common.RebuildRes + dbsToMigrate, err := b.getDBsToRebuild(ctx) + if err != nil { + b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err)) + return res, err + } + b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate))) + for _, db := range dbsToMigrate { + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) + movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage) + res.ObjectsMoved += movedObjects + if err != nil { + b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) + return res, err + } + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) + res.FilesRemoved++ + } + + return res, nil +} + +func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { + dbsToMigrate := make(map[string]struct{}) + if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) { + dbsToMigrate[s] = struct{}{} + return false, nil + }); err != nil { + return nil, err + } + if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) { + delete(dbsToMigrate, s) + return false, nil + }); err != nil { + return nil, err + } + result := make([]string, 0, len(dbsToMigrate)) + for db := range dbsToMigrate { + result = append(result, db) + } + return result, nil +} + +func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) { + shDB := b.getBlobovnicza(path) + blz, err := shDB.Open() + if err != nil { + return 0, err + } + shDBClosed := false + defer func() { + if shDBClosed { + return + } + shDB.Close() + }() + + migratedObjects, err := b.moveObjects(ctx, blz, meta) + if err != nil { + return migratedObjects, err + } + shDBClosed, err = b.dropDB(ctx, path, shDB) + return migratedObjects, err +} + +func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) { + var result uint64 + + var prm blobovnicza.IteratePrm + prm.DecodeAddresses() + prm.SetHandler(func(ie blobovnicza.IterationElement) error { + e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta) + if e == nil { + result++ + } + return e + }) + + _, err := blz.Iterate(ctx, prm) + return result, err +} + +func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error { + var pPrm common.PutPrm + pPrm.Address = addr + pPrm.RawData = data + pRes, err := b.Put(ctx, pPrm) + if err != nil { + return err + } + return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID) +} + +func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID + } + + b.dbCache.EvictAndMarkNonCached(path) + defer b.dbCache.RemoveFromNonCached(path) + + b.dbFilesGuard.Lock() + defer b.dbFilesGuard.Unlock() + + if err := shDb.CloseAndRemoveFile(); err != nil { + return false, err + } + b.commondbManager.CleanResources(path) + if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil { + return true, err + } + return true, nil +} + +func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error { + if path == "." { + return nil + } + + sysPath := filepath.Join(b.rootPath, path) + entries, err := os.ReadDir(sysPath) + if err != nil { + return err + } + if len(entries) > 0 { + return nil + } + if err := os.Remove(sysPath); err != nil { + return err + } + return b.dropDirectoryIfEmpty(filepath.Dir(path)) } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go new file mode 100644 index 00000000..00b38f69 --- /dev/null +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -0,0 +1,132 @@ +package blobovniczatree + +import ( + "context" + "sync" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestBlobovniczaTreeRebuild(t *testing.T) { + t.Parallel() + + t.Run("width increased", func(t *testing.T) { + t.Parallel() + testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false) + }) + + t.Run("width reduced", func(t *testing.T) { + t.Parallel() + testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true) + }) + + t.Run("depth increased", func(t *testing.T) { + t.Parallel() + testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true) + }) + + t.Run("depth reduced", func(t *testing.T) { + t.Parallel() + testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true) + }) +} + +func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) { + dir := t.TempDir() + b := NewBlobovniczaTree( + WithLogger(test.NewLogger(t, true)), + WithObjectSizeLimit(2048), + WithBlobovniczaShallowWidth(sourceWidth), + WithBlobovniczaShallowDepth(sourceDepth), + WithRootPath(dir), + WithBlobovniczaSize(100*1024*1024), + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000)) + require.NoError(t, b.Open(false)) + require.NoError(t, b.Init()) + + eg, egCtx := errgroup.WithContext(context.Background()) + storageIDs := make(map[oid.Address][]byte) + storageIDsGuard := &sync.Mutex{} + for i := 0; i < 1000; i++ { + eg.Go(func() error { + obj := blobstortest.NewObject(1024) + data, err := obj.Marshal() + if err != nil { + return err + } + var prm common.PutPrm + prm.Address = object.AddressOf(obj) + prm.RawData = data + res, err := b.Put(egCtx, prm) + if err != nil { + return err + } + storageIDsGuard.Lock() + storageIDs[prm.Address] = res.StorageID + storageIDsGuard.Unlock() + return nil + }) + } + + require.NoError(t, eg.Wait()) + require.NoError(t, b.Close()) + + b = NewBlobovniczaTree( + WithLogger(test.NewLogger(t, true)), + WithObjectSizeLimit(2048), + WithBlobovniczaShallowWidth(targetWidth), + WithBlobovniczaShallowDepth(targetDepth), + WithRootPath(dir), + WithBlobovniczaSize(100*1024*1024), + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000)) + require.NoError(t, b.Open(false)) + require.NoError(t, b.Init()) + + for addr, storageID := range storageIDs { + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + metaStub := &storageIDUpdateStub{ + storageIDs: storageIDs, + } + var rPrm common.RebuildPrm + rPrm.MetaStorage = metaStub + rRes, err := b.Rebuild(context.Background(), rPrm) + require.NoError(t, err) + dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 + require.Equal(t, shouldMigrate, dataMigrated) + + for addr, storageID := range storageIDs { + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + require.NoError(t, b.Close()) +} + +type storageIDUpdateStub struct { + storageIDs map[oid.Address][]byte + updatedCount uint64 +} + +func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { + s.storageIDs[addr] = storageID + s.updatedCount++ + return nil +} diff --git a/pkg/local_object_storage/blobstor/common/rebuild.go b/pkg/local_object_storage/blobstor/common/rebuild.go index 1cbc2cbd..896ecbb3 100644 --- a/pkg/local_object_storage/blobstor/common/rebuild.go +++ b/pkg/local_object_storage/blobstor/common/rebuild.go @@ -3,7 +3,7 @@ package common import ( "context" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) type RebuildRes struct { @@ -16,5 +16,5 @@ type RebuildPrm struct { } type MetaStorage interface { - UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error + UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } diff --git a/pkg/local_object_storage/blobstor/rebuild.go b/pkg/local_object_storage/blobstor/rebuild.go index 29396da7..882381dc 100644 --- a/pkg/local_object_storage/blobstor/rebuild.go +++ b/pkg/local_object_storage/blobstor/rebuild.go @@ -5,12 +5,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type StorageIDUpdate interface { - UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error + UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error { diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go index 3e0c66ab..a636fceb 100644 --- a/pkg/local_object_storage/shard/rebuilder.go +++ b/pkg/local_object_storage/shard/rebuilder.go @@ -9,7 +9,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -77,13 +77,20 @@ type mbStorageIDUpdate struct { mb *meta.DB } -func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error { +func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if u.mb == nil { return errMBIsNotAvailable } - var prm meta.PutPrm - prm.SetObject(obj) + + var prm meta.UpdateStorageIDPrm + prm.SetAddress(addr) prm.SetStorageID(storageID) - _, err := u.mb.Put(ctx, prm) + _, err := u.mb.UpdateStorageID(ctx, prm) return err }