forked from TrueCloudLab/frostfs-node
[#1226] blobovniczatree: Drop leaf width limitation
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
40c9ddb6ba
commit
78b1d9b18d
10 changed files with 50 additions and 55 deletions
|
@ -37,17 +37,17 @@ type activeDBManager struct {
|
|||
closed bool
|
||||
|
||||
dbManager *dbManager
|
||||
leafWidth uint64
|
||||
rootPath string
|
||||
}
|
||||
|
||||
func newActiveDBManager(dbManager *dbManager, leafWidth uint64) *activeDBManager {
|
||||
func newActiveDBManager(dbManager *dbManager, rootPath string) *activeDBManager {
|
||||
return &activeDBManager{
|
||||
levelToActiveDBGuard: &sync.RWMutex{},
|
||||
levelToActiveDB: make(map[string]*sharedDB),
|
||||
levelLock: utilSync.NewKeyLocker[string](),
|
||||
|
||||
dbManager: dbManager,
|
||||
leafWidth: leafWidth,
|
||||
rootPath: rootPath,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,30 +144,25 @@ func (m *activeDBManager) updateAndGetActive(lvlPath string) (*activeDB, error)
|
|||
}
|
||||
|
||||
func (m *activeDBManager) getNextSharedDB(lvlPath string) (*sharedDB, error) {
|
||||
var idx uint64
|
||||
var iterCount uint64
|
||||
var nextActiveDBIdx uint64
|
||||
hasActive, currentIdx := m.hasActiveDB(lvlPath)
|
||||
if hasActive {
|
||||
idx = (currentIdx + 1) % m.leafWidth
|
||||
}
|
||||
|
||||
var next *sharedDB
|
||||
|
||||
for iterCount < m.leafWidth {
|
||||
path := filepath.Join(lvlPath, u64ToHexStringExt(idx))
|
||||
shDB := m.dbManager.GetByPath(path)
|
||||
db, err := shDB.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
|
||||
nextActiveDBIdx = currentIdx + 1
|
||||
} else {
|
||||
hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(m.rootPath, lvlPath))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if db.IsFull() {
|
||||
shDB.Close()
|
||||
} else {
|
||||
next = shDB
|
||||
break
|
||||
if hasDBs {
|
||||
nextActiveDBIdx = maxIdx
|
||||
}
|
||||
idx = (idx + 1) % m.leafWidth
|
||||
iterCount++
|
||||
}
|
||||
|
||||
path := filepath.Join(lvlPath, u64ToHexStringExt(nextActiveDBIdx))
|
||||
next := m.dbManager.GetByPath(path)
|
||||
_, err := next.Open() // open db to hold active DB open, will be closed if db is full, after m.replace or by activeDBManager.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
previous, updated := m.replace(lvlPath, next)
|
||||
|
|
|
@ -3,6 +3,7 @@ package blobovniczatree
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -81,12 +82,8 @@ func NewBlobovniczaTree(ctx context.Context, opts ...Option) (blz *Blobovniczas)
|
|||
opts[i](&blz.cfg)
|
||||
}
|
||||
|
||||
if blz.blzLeafWidth == 0 {
|
||||
blz.blzLeafWidth = blz.blzShallowWidth
|
||||
}
|
||||
|
||||
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
|
||||
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.rootPath)
|
||||
blz.dbCache = newDBCache(ctx, blz.openedCacheSize,
|
||||
blz.openedCacheTTL, blz.openedCacheExpInterval, blz.commondbManager)
|
||||
blz.deleteProtectedObjects = newAddressMap()
|
||||
|
@ -124,6 +121,29 @@ func u64FromHexString(str string) uint64 {
|
|||
return v
|
||||
}
|
||||
|
||||
func getBlobovniczaMaxIndex(directory string) (bool, uint64, error) {
|
||||
entries, err := os.ReadDir(directory)
|
||||
if os.IsNotExist(err) { // non initialized tree
|
||||
return false, 0, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, 0, err
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return false, 0, nil
|
||||
}
|
||||
var hasDBs bool
|
||||
var maxIdx uint64
|
||||
for _, e := range entries {
|
||||
if e.IsDir() {
|
||||
continue
|
||||
}
|
||||
hasDBs = true
|
||||
maxIdx = max(u64FromHexString(e.Name()), maxIdx)
|
||||
}
|
||||
return hasDBs, maxIdx, nil
|
||||
}
|
||||
|
||||
// Type is blobovniczatree storage type used in logs and configuration.
|
||||
const Type = "blobovnicza"
|
||||
|
||||
|
|
|
@ -130,7 +130,14 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
|
|||
isLeafLevel := uint64(len(curPath)) == b.blzShallowDepth
|
||||
levelWidth := b.blzShallowWidth
|
||||
if isLeafLevel {
|
||||
levelWidth = b.blzLeafWidth
|
||||
hasDBs, maxIdx, err := getBlobovniczaMaxIndex(filepath.Join(append([]string{b.rootPath}, curPath...)...))
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
levelWidth = 0
|
||||
if hasDBs {
|
||||
levelWidth = maxIdx + 1
|
||||
}
|
||||
}
|
||||
indices := indexSlice(levelWidth)
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ type cfg struct {
|
|||
openedCacheSize int
|
||||
blzShallowDepth uint64
|
||||
blzShallowWidth uint64
|
||||
blzLeafWidth uint64
|
||||
compression *compression.Config
|
||||
blzOpts []blobovnicza.Option
|
||||
reportError func(string, error) // reportError is the function called when encountering disk errors.
|
||||
|
@ -82,12 +81,6 @@ func WithBlobovniczaShallowWidth(width uint64) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithBlobovniczaLeafWidth(w uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.blzLeafWidth = w
|
||||
}
|
||||
}
|
||||
|
||||
func WithBlobovniczaShallowDepth(depth uint64) Option {
|
||||
return func(c *cfg) {
|
||||
c.blzShallowDepth = depth
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue