[#661] blobovniczatree: Add Rebuild implementation
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
a531eaf8bc
commit
422226da18
14 changed files with 443 additions and 39 deletions
|
@ -531,4 +531,10 @@ const (
|
|||
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
|
||||
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
|
||||
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
|
||||
BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..."
|
||||
BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed"
|
||||
BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully"
|
||||
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
|
||||
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
|
||||
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
|
||||
)
|
||||
|
|
|
@ -21,8 +21,8 @@ func (db *activeDB) Close() {
|
|||
db.shDB.Close()
|
||||
}
|
||||
|
||||
func (db *activeDB) Path() string {
|
||||
return db.shDB.Path()
|
||||
func (db *activeDB) SystemPath() string {
|
||||
return db.shDB.SystemPath()
|
||||
}
|
||||
|
||||
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
||||
|
@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
|
|||
if !ok {
|
||||
return false, 0
|
||||
}
|
||||
return true, u64FromHexString(filepath.Base(db.Path()))
|
||||
return true, u64FromHexString(filepath.Base(db.SystemPath()))
|
||||
}
|
||||
|
||||
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
|
@ -58,6 +59,8 @@ type Blobovniczas struct {
|
|||
commondbManager *dbManager
|
||||
activeDBManager *activeDBManager
|
||||
dbCache *dbCache
|
||||
dbFilesGuard *sync.RWMutex
|
||||
rebuildGuard *sync.RWMutex
|
||||
}
|
||||
|
||||
var _ common.Storage = (*Blobovniczas)(nil)
|
||||
|
@ -84,6 +87,8 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
|||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||
blz.dbFilesGuard = &sync.RWMutex{}
|
||||
blz.rebuildGuard = &sync.RWMutex{}
|
||||
|
||||
return blz
|
||||
}
|
||||
|
|
|
@ -15,8 +15,9 @@ import (
|
|||
type dbCache struct {
|
||||
cacheGuard *sync.RWMutex
|
||||
cache simplelru.LRUCache[string, *sharedDB]
|
||||
pathLock *utilSync.KeyLocker[string]
|
||||
pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
|
||||
closed bool
|
||||
nonCached map[string]struct{}
|
||||
|
||||
dbManager *dbManager
|
||||
}
|
||||
|
@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache {
|
|||
cache: cache,
|
||||
dbManager: dbManager,
|
||||
pathLock: utilSync.NewKeyLocker[string](),
|
||||
nonCached: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB {
|
|||
return c.create(path)
|
||||
}
|
||||
|
||||
func (c *dbCache) EvictAndMarkNonCached(path string) {
|
||||
c.pathLock.Lock(path)
|
||||
defer c.pathLock.Unlock(path)
|
||||
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
c.cache.Remove(path)
|
||||
c.nonCached[path] = struct{}{}
|
||||
}
|
||||
|
||||
func (c *dbCache) RemoveFromNonCached(path string) {
|
||||
c.pathLock.Lock(path)
|
||||
defer c.pathLock.Unlock(path)
|
||||
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
delete(c.nonCached, path)
|
||||
}
|
||||
|
||||
func (c *dbCache) getExisted(path string) *sharedDB {
|
||||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
|
|||
c.cacheGuard.Lock()
|
||||
defer c.cacheGuard.Unlock()
|
||||
|
||||
if !c.closed {
|
||||
_, isNonCached := c.nonCached[path]
|
||||
|
||||
if !isNonCached && !c.closed {
|
||||
c.cache.Add(path, db)
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -43,6 +43,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
|||
return common.DeleteRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
if b.rebuildGuard.TryRLock() {
|
||||
defer b.rebuildGuard.RUnlock()
|
||||
} else {
|
||||
return common.DeleteRes{}, errRebuildInProgress
|
||||
}
|
||||
|
||||
var bPrm blobovnicza.DeletePrm
|
||||
bPrm.SetAddress(prm.Address)
|
||||
|
||||
|
|
|
@ -181,6 +181,11 @@ func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string
|
|||
}
|
||||
|
||||
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
||||
if path == "" {
|
||||
b.dbFilesGuard.RLock()
|
||||
defer b.dbFilesGuard.RUnlock()
|
||||
}
|
||||
|
||||
sysPath := filepath.Join(b.rootPath, path)
|
||||
entries, err := os.ReadDir(sysPath)
|
||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||
|
@ -222,6 +227,11 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
|||
}
|
||||
|
||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||
if path == "" {
|
||||
b.dbFilesGuard.RLock()
|
||||
defer b.dbFilesGuard.RUnlock()
|
||||
}
|
||||
|
||||
sysPath := filepath.Join(b.rootPath, path)
|
||||
entries, err := os.ReadDir(sysPath)
|
||||
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -12,9 +14,11 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
|
||||
|
||||
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
||||
type sharedDB struct {
|
||||
guard *sync.RWMutex
|
||||
cond *sync.Cond
|
||||
blcza *blobovnicza.Blobovnicza
|
||||
refCount uint32
|
||||
|
||||
|
@ -31,8 +35,9 @@ func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
|||
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
|
||||
) *sharedDB {
|
||||
return &sharedDB{
|
||||
guard: &sync.RWMutex{},
|
||||
|
||||
cond: &sync.Cond{
|
||||
L: &sync.RWMutex{},
|
||||
},
|
||||
options: options,
|
||||
path: path,
|
||||
readOnly: readOnly,
|
||||
|
@ -48,8 +53,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
|||
return nil, errClosed
|
||||
}
|
||||
|
||||
b.guard.Lock()
|
||||
defer b.guard.Unlock()
|
||||
b.cond.L.Lock()
|
||||
defer b.cond.L.Unlock()
|
||||
|
||||
if b.refCount > 0 {
|
||||
b.refCount++
|
||||
|
@ -77,11 +82,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
|||
}
|
||||
|
||||
func (b *sharedDB) Close() {
|
||||
b.guard.Lock()
|
||||
defer b.guard.Unlock()
|
||||
b.cond.L.Lock()
|
||||
defer b.cond.L.Unlock()
|
||||
|
||||
if b.refCount == 0 {
|
||||
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
||||
b.cond.Broadcast()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -99,9 +105,38 @@ func (b *sharedDB) Close() {
|
|||
}
|
||||
|
||||
b.refCount--
|
||||
if b.refCount == 1 {
|
||||
b.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *sharedDB) Path() string {
|
||||
func (b *sharedDB) CloseAndRemoveFile() error {
|
||||
b.cond.L.Lock()
|
||||
if b.refCount > 1 {
|
||||
b.cond.Wait()
|
||||
}
|
||||
defer b.cond.L.Unlock()
|
||||
|
||||
if b.refCount == 0 {
|
||||
return errClosingClosedBlobovnicza
|
||||
}
|
||||
|
||||
if err := b.blcza.Close(); err != nil {
|
||||
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
||||
zap.String("id", b.path),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
||||
}
|
||||
|
||||
b.refCount = 0
|
||||
b.blcza = nil
|
||||
b.openDBCounter.Dec()
|
||||
|
||||
return os.Remove(b.path)
|
||||
}
|
||||
|
||||
func (b *sharedDB) SystemPath() string {
|
||||
return b.path
|
||||
}
|
||||
|
||||
|
@ -166,6 +201,13 @@ func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
|
|||
return db
|
||||
}
|
||||
|
||||
func (m *levelDbManager) hasAnyDB() bool {
|
||||
m.dbMtx.RLock()
|
||||
defer m.dbMtx.RUnlock()
|
||||
|
||||
return len(m.databases) > 0
|
||||
}
|
||||
|
||||
// dbManager manages the opening and closing of blobovnicza instances.
|
||||
//
|
||||
// The blobovnicza opens at the first request, closes after the last request.
|
||||
|
@ -203,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB {
|
|||
return levelManager.GetByIndex(curIndex)
|
||||
}
|
||||
|
||||
func (m *dbManager) CleanResources(path string) {
|
||||
lvlPath := filepath.Dir(path)
|
||||
|
||||
m.levelToManagerGuard.Lock()
|
||||
defer m.levelToManagerGuard.Unlock()
|
||||
|
||||
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
|
||||
delete(m.levelToManager, lvlPath)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *dbManager) Open() {
|
||||
m.closedFlag.Store(false)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package blobovniczatree
|
|||
|
||||
import (
|
||||
"io/fs"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||
|
@ -23,6 +24,7 @@ type cfg struct {
|
|||
// reportError is the function called when encountering disk errors.
|
||||
reportError func(string, error)
|
||||
metrics Metrics
|
||||
waitBeforeDropDB time.Duration
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
@ -32,6 +34,7 @@ const (
|
|||
defaultOpenedCacheSize = 50
|
||||
defaultBlzShallowDepth = 2
|
||||
defaultBlzShallowWidth = 16
|
||||
defaultWaitBeforeDropDB = 10 * time.Second
|
||||
)
|
||||
|
||||
func initConfig(c *cfg) {
|
||||
|
@ -43,6 +46,7 @@ func initConfig(c *cfg) {
|
|||
blzShallowWidth: defaultBlzShallowWidth,
|
||||
reportError: func(string, error) {},
|
||||
metrics: &noopMetrics{},
|
||||
waitBeforeDropDB: defaultWaitBeforeDropDB,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,3 +110,9 @@ func WithMetrics(m Metrics) Option {
|
|||
c.metrics = m
|
||||
}
|
||||
}
|
||||
|
||||
func WithWaitBeforeDropDB(t time.Duration) Option {
|
||||
return func(c *cfg) {
|
||||
c.waitBeforeDropDB = t
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,7 +104,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
|
|||
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||
} else {
|
||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
||||
zap.String("path", active.Path()),
|
||||
zap.String("path", active.SystemPath()),
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
|
|||
return false, nil
|
||||
}
|
||||
|
||||
idx := u64FromHexString(filepath.Base(active.Path()))
|
||||
idx := u64FromHexString(filepath.Base(active.SystemPath()))
|
||||
i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
|
||||
|
||||
return true, nil
|
||||
|
|
|
@ -2,10 +2,160 @@ package blobovniczatree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (b *Blobovniczas) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
||||
return common.RebuildRes{}, nil
|
||||
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
||||
|
||||
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
|
||||
if b.readOnly {
|
||||
return common.RebuildRes{}, common.ErrReadOnly
|
||||
}
|
||||
|
||||
b.rebuildGuard.Lock()
|
||||
defer b.rebuildGuard.Unlock()
|
||||
|
||||
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||
var res common.RebuildRes
|
||||
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
||||
return res, err
|
||||
}
|
||||
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
|
||||
for _, db := range dbsToMigrate {
|
||||
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
||||
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
|
||||
res.ObjectsMoved += movedObjects
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
||||
return res, err
|
||||
}
|
||||
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
|
||||
res.FilesRemoved++
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
|
||||
dbsToMigrate := make(map[string]struct{})
|
||||
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
||||
dbsToMigrate[s] = struct{}{}
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
|
||||
delete(dbsToMigrate, s)
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := make([]string, 0, len(dbsToMigrate))
|
||||
for db := range dbsToMigrate {
|
||||
result = append(result, db)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) {
|
||||
shDB := b.getBlobovnicza(path)
|
||||
blz, err := shDB.Open()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
shDBClosed := false
|
||||
defer func() {
|
||||
if shDBClosed {
|
||||
return
|
||||
}
|
||||
shDB.Close()
|
||||
}()
|
||||
|
||||
migratedObjects, err := b.moveObjects(ctx, blz, meta)
|
||||
if err != nil {
|
||||
return migratedObjects, err
|
||||
}
|
||||
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
||||
return migratedObjects, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) {
|
||||
var result uint64
|
||||
|
||||
var prm blobovnicza.IteratePrm
|
||||
prm.DecodeAddresses()
|
||||
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
||||
e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta)
|
||||
if e == nil {
|
||||
result++
|
||||
}
|
||||
return e
|
||||
})
|
||||
|
||||
_, err := blz.Iterate(ctx, prm)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||
var pPrm common.PutPrm
|
||||
pPrm.Address = addr
|
||||
pPrm.RawData = data
|
||||
pRes, err := b.Put(ctx, pPrm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID)
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false, ctx.Err()
|
||||
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
|
||||
}
|
||||
|
||||
b.dbCache.EvictAndMarkNonCached(path)
|
||||
defer b.dbCache.RemoveFromNonCached(path)
|
||||
|
||||
b.dbFilesGuard.Lock()
|
||||
defer b.dbFilesGuard.Unlock()
|
||||
|
||||
if err := shDb.CloseAndRemoveFile(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
b.commondbManager.CleanResources(path)
|
||||
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
|
||||
return true, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
||||
if path == "." {
|
||||
return nil
|
||||
}
|
||||
|
||||
sysPath := filepath.Join(b.rootPath, path)
|
||||
entries, err := os.ReadDir(sysPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(entries) > 0 {
|
||||
return nil
|
||||
}
|
||||
if err := os.Remove(sysPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
package blobovniczatree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestBlobovniczaTreeRebuild(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("width increased", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
|
||||
})
|
||||
|
||||
t.Run("width reduced", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
|
||||
})
|
||||
|
||||
t.Run("depth increased", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
|
||||
})
|
||||
|
||||
t.Run("depth reduced", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
|
||||
})
|
||||
}
|
||||
|
||||
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithObjectSizeLimit(2048),
|
||||
WithBlobovniczaShallowWidth(sourceWidth),
|
||||
WithBlobovniczaShallowDepth(sourceDepth),
|
||||
WithRootPath(dir),
|
||||
WithBlobovniczaSize(100*1024*1024),
|
||||
WithWaitBeforeDropDB(0),
|
||||
WithOpenedCacheSize(1000))
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
eg, egCtx := errgroup.WithContext(context.Background())
|
||||
storageIDs := make(map[oid.Address][]byte)
|
||||
storageIDsGuard := &sync.Mutex{}
|
||||
for i := 0; i < 1000; i++ {
|
||||
eg.Go(func() error {
|
||||
obj := blobstortest.NewObject(1024)
|
||||
data, err := obj.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var prm common.PutPrm
|
||||
prm.Address = object.AddressOf(obj)
|
||||
prm.RawData = data
|
||||
res, err := b.Put(egCtx, prm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
storageIDsGuard.Lock()
|
||||
storageIDs[prm.Address] = res.StorageID
|
||||
storageIDsGuard.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
require.NoError(t, eg.Wait())
|
||||
require.NoError(t, b.Close())
|
||||
|
||||
b = NewBlobovniczaTree(
|
||||
WithLogger(test.NewLogger(t, true)),
|
||||
WithObjectSizeLimit(2048),
|
||||
WithBlobovniczaShallowWidth(targetWidth),
|
||||
WithBlobovniczaShallowDepth(targetDepth),
|
||||
WithRootPath(dir),
|
||||
WithBlobovniczaSize(100*1024*1024),
|
||||
WithWaitBeforeDropDB(0),
|
||||
WithOpenedCacheSize(1000))
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
for addr, storageID := range storageIDs {
|
||||
var gPrm common.GetPrm
|
||||
gPrm.Address = addr
|
||||
gPrm.StorageID = storageID
|
||||
_, err := b.Get(context.Background(), gPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
metaStub := &storageIDUpdateStub{
|
||||
storageIDs: storageIDs,
|
||||
}
|
||||
var rPrm common.RebuildPrm
|
||||
rPrm.MetaStorage = metaStub
|
||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||
require.NoError(t, err)
|
||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||
require.Equal(t, shouldMigrate, dataMigrated)
|
||||
|
||||
for addr, storageID := range storageIDs {
|
||||
var gPrm common.GetPrm
|
||||
gPrm.Address = addr
|
||||
gPrm.StorageID = storageID
|
||||
_, err := b.Get(context.Background(), gPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, b.Close())
|
||||
}
|
||||
|
||||
type storageIDUpdateStub struct {
|
||||
storageIDs map[oid.Address][]byte
|
||||
updatedCount uint64
|
||||
}
|
||||
|
||||
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||||
s.storageIDs[addr] = storageID
|
||||
s.updatedCount++
|
||||
return nil
|
||||
}
|
|
@ -3,7 +3,7 @@ package common
|
|||
import (
|
||||
"context"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type RebuildRes struct {
|
||||
|
@ -16,5 +16,5 @@ type RebuildPrm struct {
|
|||
}
|
||||
|
||||
type MetaStorage interface {
|
||||
UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error
|
||||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
|
|
@ -5,12 +5,12 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type StorageIDUpdate interface {
|
||||
UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error
|
||||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error {
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -77,13 +77,20 @@ type mbStorageIDUpdate struct {
|
|||
mb *meta.DB
|
||||
}
|
||||
|
||||
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error {
|
||||
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if u.mb == nil {
|
||||
return errMBIsNotAvailable
|
||||
}
|
||||
var prm meta.PutPrm
|
||||
prm.SetObject(obj)
|
||||
|
||||
var prm meta.UpdateStorageIDPrm
|
||||
prm.SetAddress(addr)
|
||||
prm.SetStorageID(storageID)
|
||||
_, err := u.mb.Put(ctx, prm)
|
||||
_, err := u.mb.UpdateStorageID(ctx, prm)
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue