[#661] blobovniczatree: Add Rebuild implementation
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b22b703325
commit
9ef4a885de
14 changed files with 445 additions and 39 deletions
|
@ -524,4 +524,10 @@ const (
|
||||||
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
|
BlobovniczaTreeFixingFileExtensionFailed = "failed to fix blobovnicza file extension"
|
||||||
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
|
BlobstorRebuildFailedToRebuildStorages = "failed to rebuild storages"
|
||||||
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
|
BlobstorRebuildRebuildStoragesCompleted = "storages rebuild completed"
|
||||||
|
BlobovniczaTreeCollectingDBToRebuild = "collecting blobovniczas to rebuild..."
|
||||||
|
BlobovniczaTreeCollectingDBToRebuildFailed = "collecting blobovniczas to rebuild failed"
|
||||||
|
BlobovniczaTreeCollectingDBToRebuildSuccess = "collecting blobovniczas to rebuild completed successfully"
|
||||||
|
BlobovniczaTreeRebuildingBlobovnicza = "rebuilding blobovnicza..."
|
||||||
|
BlobovniczaTreeRebuildingBlobovniczaFailed = "rebuilding blobovnicza failed"
|
||||||
|
BlobovniczaTreeRebuildingBlobovniczaSuccess = "rebuilding blobovnicza completed successfully"
|
||||||
)
|
)
|
||||||
|
|
|
@ -21,8 +21,8 @@ func (db *activeDB) Close() {
|
||||||
db.shDB.Close()
|
db.shDB.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *activeDB) Path() string {
|
func (db *activeDB) SystemPath() string {
|
||||||
return db.shDB.Path()
|
return db.shDB.SystemPath()
|
||||||
}
|
}
|
||||||
|
|
||||||
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
// activeDBManager manages active blobovnicza instances (that is, those that are being used for Put).
|
||||||
|
@ -192,7 +192,7 @@ func (m *activeDBManager) hasActiveDB(lvlPath string) (bool, uint64) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return false, 0
|
return false, 0
|
||||||
}
|
}
|
||||||
return true, u64FromHexString(filepath.Base(db.Path()))
|
return true, u64FromHexString(filepath.Base(db.SystemPath()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
func (m *activeDBManager) replace(lvlPath string, shDB *sharedDB) (*sharedDB, bool) {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
@ -58,6 +59,8 @@ type Blobovniczas struct {
|
||||||
commondbManager *dbManager
|
commondbManager *dbManager
|
||||||
activeDBManager *activeDBManager
|
activeDBManager *activeDBManager
|
||||||
dbCache *dbCache
|
dbCache *dbCache
|
||||||
|
dbFilesGuard *sync.RWMutex
|
||||||
|
rebuildGuard *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ common.Storage = (*Blobovniczas)(nil)
|
var _ common.Storage = (*Blobovniczas)(nil)
|
||||||
|
@ -84,6 +87,8 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
|
||||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||||
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)
|
||||||
|
blz.dbFilesGuard = &sync.RWMutex{}
|
||||||
|
blz.rebuildGuard = &sync.RWMutex{}
|
||||||
|
|
||||||
return blz
|
return blz
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,9 @@ import (
|
||||||
type dbCache struct {
|
type dbCache struct {
|
||||||
cacheGuard *sync.RWMutex
|
cacheGuard *sync.RWMutex
|
||||||
cache simplelru.LRUCache[string, *sharedDB]
|
cache simplelru.LRUCache[string, *sharedDB]
|
||||||
pathLock *utilSync.KeyLocker[string]
|
pathLock *utilSync.KeyLocker[string] // the order of locks is important: pathLock first, cacheGuard second
|
||||||
closed bool
|
closed bool
|
||||||
|
nonCached map[string]struct{}
|
||||||
|
|
||||||
dbManager *dbManager
|
dbManager *dbManager
|
||||||
}
|
}
|
||||||
|
@ -34,6 +35,7 @@ func newDBCache(size int, dbManager *dbManager) *dbCache {
|
||||||
cache: cache,
|
cache: cache,
|
||||||
dbManager: dbManager,
|
dbManager: dbManager,
|
||||||
pathLock: utilSync.NewKeyLocker[string](),
|
pathLock: utilSync.NewKeyLocker[string](),
|
||||||
|
nonCached: make(map[string]struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,6 +61,27 @@ func (c *dbCache) GetOrCreate(path string) *sharedDB {
|
||||||
return c.create(path)
|
return c.create(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) EvictAndMarkNonCached(path string) {
|
||||||
|
c.pathLock.Lock(path)
|
||||||
|
defer c.pathLock.Unlock(path)
|
||||||
|
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
|
c.cache.Remove(path)
|
||||||
|
c.nonCached[path] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *dbCache) RemoveFromNonCached(path string) {
|
||||||
|
c.pathLock.Lock(path)
|
||||||
|
defer c.pathLock.Unlock(path)
|
||||||
|
|
||||||
|
c.cacheGuard.Lock()
|
||||||
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
|
delete(c.nonCached, path)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *dbCache) getExisted(path string) *sharedDB {
|
func (c *dbCache) getExisted(path string) *sharedDB {
|
||||||
c.cacheGuard.Lock()
|
c.cacheGuard.Lock()
|
||||||
defer c.cacheGuard.Unlock()
|
defer c.cacheGuard.Unlock()
|
||||||
|
@ -94,7 +117,9 @@ func (c *dbCache) put(path string, db *sharedDB) bool {
|
||||||
c.cacheGuard.Lock()
|
c.cacheGuard.Lock()
|
||||||
defer c.cacheGuard.Unlock()
|
defer c.cacheGuard.Unlock()
|
||||||
|
|
||||||
if !c.closed {
|
_, isNonCached := c.nonCached[path]
|
||||||
|
|
||||||
|
if !isNonCached && !c.closed {
|
||||||
c.cache.Add(path, db)
|
c.cache.Add(path, db)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,12 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
return common.DeleteRes{}, common.ErrReadOnly
|
return common.DeleteRes{}, common.ErrReadOnly
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if b.rebuildGuard.TryRLock() {
|
||||||
|
defer b.rebuildGuard.RUnlock()
|
||||||
|
} else {
|
||||||
|
return common.DeleteRes{}, errRebuildInProgress
|
||||||
|
}
|
||||||
|
|
||||||
var bPrm blobovnicza.DeletePrm
|
var bPrm blobovnicza.DeletePrm
|
||||||
bPrm.SetAddress(prm.Address)
|
bPrm.SetAddress(prm.Address)
|
||||||
|
|
||||||
|
|
|
@ -175,6 +175,11 @@ func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
|
||||||
|
if path == "" {
|
||||||
|
b.dbFilesGuard.RLock()
|
||||||
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
||||||
|
@ -216,6 +221,11 @@ func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Addres
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
|
||||||
|
if path == "" {
|
||||||
|
b.dbFilesGuard.RLock()
|
||||||
|
defer b.dbFilesGuard.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
sysPath := filepath.Join(b.rootPath, path)
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
entries, err := os.ReadDir(sysPath)
|
entries, err := os.ReadDir(sysPath)
|
||||||
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
if os.IsNotExist(err) && b.readOnly && path == "" { //non initialized tree in read only mode
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package blobovniczatree
|
package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -12,9 +14,13 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errClosingClosedBlobovnicza = errors.New("closing closed blobovnicza is not allowed")
|
||||||
|
)
|
||||||
|
|
||||||
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
// sharedDB is responsible for opening and closing a file of single blobovnicza.
|
||||||
type sharedDB struct {
|
type sharedDB struct {
|
||||||
guard *sync.RWMutex
|
cond *sync.Cond
|
||||||
blcza *blobovnicza.Blobovnicza
|
blcza *blobovnicza.Blobovnicza
|
||||||
refCount uint32
|
refCount uint32
|
||||||
|
|
||||||
|
@ -30,8 +36,9 @@ type sharedDB struct {
|
||||||
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
func newSharedDB(options []blobovnicza.Option, path string, readOnly bool,
|
||||||
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
|
metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger) *sharedDB {
|
||||||
return &sharedDB{
|
return &sharedDB{
|
||||||
guard: &sync.RWMutex{},
|
cond: &sync.Cond{
|
||||||
|
L: &sync.RWMutex{},
|
||||||
|
},
|
||||||
options: options,
|
options: options,
|
||||||
path: path,
|
path: path,
|
||||||
readOnly: readOnly,
|
readOnly: readOnly,
|
||||||
|
@ -47,8 +54,8 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
||||||
return nil, errClosed
|
return nil, errClosed
|
||||||
}
|
}
|
||||||
|
|
||||||
b.guard.Lock()
|
b.cond.L.Lock()
|
||||||
defer b.guard.Unlock()
|
defer b.cond.L.Unlock()
|
||||||
|
|
||||||
if b.refCount > 0 {
|
if b.refCount > 0 {
|
||||||
b.refCount++
|
b.refCount++
|
||||||
|
@ -76,11 +83,12 @@ func (b *sharedDB) Open() (*blobovnicza.Blobovnicza, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *sharedDB) Close() {
|
func (b *sharedDB) Close() {
|
||||||
b.guard.Lock()
|
b.cond.L.Lock()
|
||||||
defer b.guard.Unlock()
|
defer b.cond.L.Unlock()
|
||||||
|
|
||||||
if b.refCount == 0 {
|
if b.refCount == 0 {
|
||||||
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
b.log.Error(logs.AttemtToCloseAlreadyClosedBlobovnicza, zap.String("id", b.path))
|
||||||
|
b.cond.Broadcast()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,9 +106,38 @@ func (b *sharedDB) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
b.refCount--
|
b.refCount--
|
||||||
|
if b.refCount == 1 {
|
||||||
|
b.cond.Broadcast()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *sharedDB) Path() string {
|
func (b *sharedDB) CloseAndRemoveFile() error {
|
||||||
|
b.cond.L.Lock()
|
||||||
|
if b.refCount > 1 {
|
||||||
|
b.cond.Wait()
|
||||||
|
}
|
||||||
|
defer b.cond.L.Unlock()
|
||||||
|
|
||||||
|
if b.refCount == 0 {
|
||||||
|
return errClosingClosedBlobovnicza
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.blcza.Close(); err != nil {
|
||||||
|
b.log.Error(logs.BlobovniczatreeCouldNotCloseBlobovnicza,
|
||||||
|
zap.String("id", b.path),
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return fmt.Errorf("failed to close blobovnicza (path = %s): %w", b.path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.refCount = 0
|
||||||
|
b.blcza = nil
|
||||||
|
b.openDBCounter.Dec()
|
||||||
|
|
||||||
|
return os.Remove(b.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *sharedDB) SystemPath() string {
|
||||||
return b.path
|
return b.path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,6 +201,13 @@ func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *levelDbManager) hasAnyDB() bool {
|
||||||
|
m.dbMtx.RLock()
|
||||||
|
defer m.dbMtx.RUnlock()
|
||||||
|
|
||||||
|
return len(m.databases) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// dbManager manages the opening and closing of blobovnicza instances.
|
// dbManager manages the opening and closing of blobovnicza instances.
|
||||||
//
|
//
|
||||||
// The blobovnicza opens at the first request, closes after the last request.
|
// The blobovnicza opens at the first request, closes after the last request.
|
||||||
|
@ -201,6 +245,17 @@ func (m *dbManager) GetByPath(path string) *sharedDB {
|
||||||
return levelManager.GetByIndex(curIndex)
|
return levelManager.GetByIndex(curIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *dbManager) CleanResources(path string) {
|
||||||
|
lvlPath := filepath.Dir(path)
|
||||||
|
|
||||||
|
m.levelToManagerGuard.Lock()
|
||||||
|
defer m.levelToManagerGuard.Unlock()
|
||||||
|
|
||||||
|
if result, ok := m.levelToManager[lvlPath]; ok && !result.hasAnyDB() {
|
||||||
|
delete(m.levelToManager, lvlPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *dbManager) Open() {
|
func (m *dbManager) Open() {
|
||||||
m.closedFlag.Store(false)
|
m.closedFlag.Store(false)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
|
||||||
|
@ -23,6 +24,7 @@ type cfg struct {
|
||||||
// reportError is the function called when encountering disk errors.
|
// reportError is the function called when encountering disk errors.
|
||||||
reportError func(string, error)
|
reportError func(string, error)
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
waitBeforeDropDB time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
@ -32,6 +34,7 @@ const (
|
||||||
defaultOpenedCacheSize = 50
|
defaultOpenedCacheSize = 50
|
||||||
defaultBlzShallowDepth = 2
|
defaultBlzShallowDepth = 2
|
||||||
defaultBlzShallowWidth = 16
|
defaultBlzShallowWidth = 16
|
||||||
|
defaultWaitBeforeDropDB = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
func initConfig(c *cfg) {
|
func initConfig(c *cfg) {
|
||||||
|
@ -43,6 +46,7 @@ func initConfig(c *cfg) {
|
||||||
blzShallowWidth: defaultBlzShallowWidth,
|
blzShallowWidth: defaultBlzShallowWidth,
|
||||||
reportError: func(string, error) {},
|
reportError: func(string, error) {},
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
|
waitBeforeDropDB: defaultWaitBeforeDropDB,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,3 +110,9 @@ func WithMetrics(m Metrics) Option {
|
||||||
c.metrics = m
|
c.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithWaitBeforeDropDB(t time.Duration) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.waitBeforeDropDB = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -101,14 +101,14 @@ func (i *putIterator) iterate(ctx context.Context, lvlPath string) (bool, error)
|
||||||
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
i.B.reportError(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza, err)
|
||||||
} else {
|
} else {
|
||||||
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
i.B.log.Debug(logs.BlobovniczatreeCouldNotPutObjectToActiveBlobovnicza,
|
||||||
zap.String("path", active.Path()),
|
zap.String("path", active.SystemPath()),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
idx := u64FromHexString(filepath.Base(active.Path()))
|
idx := u64FromHexString(filepath.Base(active.SystemPath()))
|
||||||
i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
|
i.ID = NewIDFromBytes([]byte(filepath.Join(lvlPath, u64ToHexString(idx))))
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
|
|
|
@ -2,10 +2,160 @@ package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (b *Blobovniczas) Rebuild(_ context.Context, _ common.RebuildPrm) (common.RebuildRes, error) {
|
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
||||||
return common.RebuildRes{}, nil
|
|
||||||
|
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
|
||||||
|
if b.readOnly {
|
||||||
|
return common.RebuildRes{}, common.ErrReadOnly
|
||||||
|
}
|
||||||
|
|
||||||
|
b.rebuildGuard.Lock()
|
||||||
|
defer b.rebuildGuard.Unlock()
|
||||||
|
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||||
|
var res common.RebuildRes
|
||||||
|
dbsToMigrate, err := b.getDBsToRebuild(ctx)
|
||||||
|
if err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
|
||||||
|
for _, db := range dbsToMigrate {
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
||||||
|
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
|
||||||
|
res.ObjectsMoved += movedObjects
|
||||||
|
if err != nil {
|
||||||
|
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
|
||||||
|
res.FilesRemoved++
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
|
||||||
|
dbsToMigrate := make(map[string]struct{})
|
||||||
|
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
|
||||||
|
dbsToMigrate[s] = struct{}{}
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := b.iterateSortedLeaves(ctx, nil, func(s string) (bool, error) {
|
||||||
|
delete(dbsToMigrate, s)
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result := make([]string, 0, len(dbsToMigrate))
|
||||||
|
for db := range dbsToMigrate {
|
||||||
|
result = append(result, db)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) {
|
||||||
|
shDB := b.getBlobovnicza(path)
|
||||||
|
blz, err := shDB.Open()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
shDBClosed := false
|
||||||
|
defer func() {
|
||||||
|
if shDBClosed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shDB.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
migratedObjects, err := b.moveObjects(ctx, blz, meta)
|
||||||
|
if err != nil {
|
||||||
|
return migratedObjects, err
|
||||||
|
}
|
||||||
|
shDBClosed, err = b.dropDB(ctx, path, shDB)
|
||||||
|
return migratedObjects, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, meta common.MetaStorage) (uint64, error) {
|
||||||
|
var result uint64
|
||||||
|
|
||||||
|
var prm blobovnicza.IteratePrm
|
||||||
|
prm.DecodeAddresses()
|
||||||
|
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
|
||||||
|
e := b.moveObject(ctx, ie.Address(), ie.ObjectData(), meta)
|
||||||
|
if e == nil {
|
||||||
|
result++
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := blz.Iterate(ctx, prm)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) moveObject(ctx context.Context, addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||||
|
var pPrm common.PutPrm
|
||||||
|
pPrm.Address = addr
|
||||||
|
pPrm.RawData = data
|
||||||
|
pRes, err := b.Put(ctx, pPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return metaStore.UpdateStorageID(ctx, addr, pRes.StorageID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) dropDB(ctx context.Context, path string, shDb *sharedDB) (bool, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
case <-time.After(b.waitBeforeDropDB): // to complete requests with old storage ID
|
||||||
|
}
|
||||||
|
|
||||||
|
b.dbCache.EvictAndMarkNonCached(path)
|
||||||
|
defer b.dbCache.RemoveFromNonCached(path)
|
||||||
|
|
||||||
|
b.dbFilesGuard.Lock()
|
||||||
|
defer b.dbFilesGuard.Unlock()
|
||||||
|
|
||||||
|
if err := shDb.CloseAndRemoveFile(); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
b.commondbManager.CleanResources(path)
|
||||||
|
if err := b.dropDirectoryIfEmpty(filepath.Dir(path)); err != nil {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
||||||
|
if path == "." {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sysPath := filepath.Join(b.rootPath, path)
|
||||||
|
entries, err := os.ReadDir(sysPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(entries) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := os.Remove(sysPath); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
package blobovniczatree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobovniczaTreeRebuild(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("width increased", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("width reduced", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("depth increased", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("depth reduced", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
b := NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithObjectSizeLimit(2048),
|
||||||
|
WithBlobovniczaShallowWidth(sourceWidth),
|
||||||
|
WithBlobovniczaShallowDepth(sourceDepth),
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithBlobovniczaSize(100*1024*1024),
|
||||||
|
WithWaitBeforeDropDB(0),
|
||||||
|
WithOpenedCacheSize(1000))
|
||||||
|
require.NoError(t, b.Open(false))
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
|
storageIDs := make(map[oid.Address][]byte)
|
||||||
|
storageIDsGuard := &sync.Mutex{}
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := blobstortest.NewObject(1024)
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Address = object.AddressOf(obj)
|
||||||
|
prm.RawData = data
|
||||||
|
res, err := b.Put(egCtx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
storageIDsGuard.Lock()
|
||||||
|
storageIDs[prm.Address] = res.StorageID
|
||||||
|
storageIDsGuard.Unlock()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
|
||||||
|
b = NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t, true)),
|
||||||
|
WithObjectSizeLimit(2048),
|
||||||
|
WithBlobovniczaShallowWidth(targetWidth),
|
||||||
|
WithBlobovniczaShallowDepth(targetDepth),
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithBlobovniczaSize(100*1024*1024),
|
||||||
|
WithWaitBeforeDropDB(0),
|
||||||
|
WithOpenedCacheSize(1000))
|
||||||
|
require.NoError(t, b.Open(false))
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
for addr, storageID := range storageIDs {
|
||||||
|
var gPrm common.GetPrm
|
||||||
|
gPrm.Address = addr
|
||||||
|
gPrm.StorageID = storageID
|
||||||
|
_, err := b.Get(context.Background(), gPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metaStub := &storageIDUpdateStub{
|
||||||
|
storageIDs: storageIDs,
|
||||||
|
}
|
||||||
|
var rPrm common.RebuildPrm
|
||||||
|
rPrm.MetaStorage = metaStub
|
||||||
|
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
require.Equal(t, shouldMigrate, dataMigrated)
|
||||||
|
|
||||||
|
for addr, storageID := range storageIDs {
|
||||||
|
var gPrm common.GetPrm
|
||||||
|
gPrm.Address = addr
|
||||||
|
gPrm.StorageID = storageID
|
||||||
|
_, err := b.Get(context.Background(), gPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
type storageIDUpdateStub struct {
|
||||||
|
storageIDs map[oid.Address][]byte
|
||||||
|
updatedCount uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||||||
|
s.storageIDs[addr] = storageID
|
||||||
|
s.updatedCount++
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -3,7 +3,7 @@ package common
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RebuildRes struct {
|
type RebuildRes struct {
|
||||||
|
@ -16,5 +16,5 @@ type RebuildPrm struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetaStorage interface {
|
type MetaStorage interface {
|
||||||
UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error
|
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,12 +5,12 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type StorageIDUpdate interface {
|
type StorageIDUpdate interface {
|
||||||
UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error
|
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error {
|
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error {
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -77,13 +77,20 @@ type mbStorageIDUpdate struct {
|
||||||
mb *meta.DB
|
mb *meta.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error {
|
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
if u.mb == nil {
|
if u.mb == nil {
|
||||||
return errMBIsNotAvailable
|
return errMBIsNotAvailable
|
||||||
}
|
}
|
||||||
var prm meta.PutPrm
|
|
||||||
prm.SetObject(obj)
|
var prm meta.UpdateStorageIDPrm
|
||||||
|
prm.SetAddress(addr)
|
||||||
prm.SetStorageID(storageID)
|
prm.SetStorageID(storageID)
|
||||||
_, err := u.mb.Put(ctx, prm)
|
_, err := u.mb.UpdateStorageID(prm)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue