Blobovnicza tree rebuild #812

Merged
fyrchik merged 17 commits from dstepanov-yadro/frostfs-node:feat/shard_migrator_master into master 2024-09-04 19:51:04 +00:00
10 changed files with 355 additions and 27 deletions
Showing only changes of commit c1667a11d2 - Show all commits

View file

@ -81,7 +81,7 @@ func NewBlobovniczaTree(opts ...Option) (blz *Blobovniczas) {
blz.blzLeafWidth = blz.blzShallowWidth
}
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.blzLeafWidth, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.commondbManager = newDBManager(blz.rootPath, blz.blzOpts, blz.readOnly, blz.metrics.Blobovnicza(), blz.log)
blz.activeDBManager = newActiveDBManager(blz.commondbManager, blz.blzLeafWidth)
blz.dbCache = newDBCache(blz.openedCacheSize, blz.commondbManager)

View file

@ -8,6 +8,7 @@ import (
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap"
)
@ -39,7 +40,36 @@ func (b *Blobovniczas) Init() error {
return nil
}
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
return b.initializeDBs(context.TODO())
}
func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
err := util.MkdirAllX(b.rootPath, b.perm)
if err != nil {
return err
}
visited := make(map[string]struct{})
err = b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
visited[p] = struct{}{}
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return true, err
}
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return false, nil
})
if err != nil {
return err
}
return b.iterateSortedLeaves(ctx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found {
return false, nil
}
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {

View file

@ -1,12 +1,16 @@
package blobovniczatree
import (
"context"
"os"
"path/filepath"
"strings"
"testing"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobovnicza"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"github.com/stretchr/testify/require"
)
@ -67,3 +71,119 @@ func validateTestTree(t *testing.T, path string) {
}
}
}
func TestObjectsAvailableAfterDepthAndWidthEdit(t *testing.T) {
t.Parallel()
rootDir := t.TempDir()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
obj35 := blobstortest.NewObject(10 * 1024)
addr35 := objectCore.AddressOf(obj35)
raw, err := obj35.Marshal()
require.NoError(t, err)
pRes35, err := blz.Put(context.Background(), common.PutPrm{
Address: addr35,
Object: obj35,
RawData: raw,
})
require.NoError(t, err)
gRes, err := blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
require.NoError(t, blz.Close())
// change depth and width
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(5),
WithBlobovniczaShallowWidth(2),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
obj52 := blobstortest.NewObject(10 * 1024)
addr52 := objectCore.AddressOf(obj52)
raw, err = obj52.Marshal()
require.NoError(t, err)
pRes52, err := blz.Put(context.Background(), common.PutPrm{
Address: addr52,
Object: obj52,
RawData: raw,
})
require.NoError(t, err)
require.NoError(t, blz.Close())
// change depth and width back
blz = NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(rootDir),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
StorageID: pRes35.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr35,
})
require.NoError(t, err)
require.EqualValues(t, obj35, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
StorageID: pRes52.StorageID,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
gRes, err = blz.Get(context.Background(), common.GetPrm{
Address: addr52,
})
require.NoError(t, err)
require.EqualValues(t, obj52, gRes.Object)
require.NoError(t, blz.Close())
}

View file

@ -63,7 +63,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.deleteObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -51,7 +51,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
var gPrm blobovnicza.GetPrm
gPrm.SetAddress(prm.Address)
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err := b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
_, err := b.getObjectFromLevel(ctx, gPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -63,7 +63,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
return res, err
}
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getObjectFromLevel(ctx, bPrm, p)
if err != nil {
if !client.IsErrObjectNotFound(err) {

View file

@ -64,7 +64,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
objectFound := false
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
err = b.iterateSortedDBPaths(ctx, prm.Address, func(p string) (bool, error) {
res, err = b.getRangeFromLevel(ctx, prm, p)
if err != nil {
outOfBounds := isErrOutOfRange(err)

View file

@ -3,6 +3,7 @@ package blobovniczatree
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
@ -70,7 +71,7 @@ func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (comm
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(ctx, func(p string) (bool, error) {
return b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
shBlz := b.getBlobovnicza(p)
blz, err := shBlz.Open()
if err != nil {
@ -91,7 +92,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors boo
})
}
// iterator over the paths of Blobovniczas sorted by weight.
// iterateSortedLeaves iterates over the paths of Blobovniczas sorted by weight.
//
// Uses depth, width and leaf width for iteration.
func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSorted(
ctx,
@ -169,9 +172,105 @@ func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, cur
return false, nil
}
// iterator over the paths of Blobovniczas in random order.
func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error {
return b.iterateSortedLeaves(ctx, nil, f)
// iterateExistingDBPaths iterates over the paths of Blobovniczas without any order.
//
// Uses existed blobovnicza files for iteration.
func (b *Blobovniczas) iterateExistingDBPaths(ctx context.Context, f func(string) (bool, error)) error {
_, err := b.iterateExistingDBPathsDFS(ctx, "", f)
return err
}
func (b *Blobovniczas) iterateExistingDBPathsDFS(ctx context.Context, path string, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
for _, entry := range entries {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if entry.IsDir() {
stop, err := b.iterateExistingDBPathsDFS(ctx, filepath.Join(path, entry.Name()), f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
} else {
stop, err := f(filepath.Join(path, entry.Name()))
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
func (b *Blobovniczas) iterateSortedDBPaths(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSordedDBPathsInternal(ctx, "", addr, f)
return err
}
func (b *Blobovniczas) iterateSordedDBPathsInternal(ctx context.Context, path string, addr oid.Address, f func(string) (bool, error)) (bool, error) {
sysPath := filepath.Join(b.rootPath, path)
entries, err := os.ReadDir(sysPath)
if os.IsNotExist(err) && b.readOnly && path == "" { // non initialized tree in read only mode
return false, nil
}
if err != nil {
return false, err
}
var dbIdxs []uint64
var dirIdxs []uint64
for _, entry := range entries {
idx := u64FromHexString(entry.Name())
if entry.IsDir() {
dirIdxs = append(dirIdxs, idx)
} else {
dbIdxs = append(dbIdxs, idx)
}
}
if len(dbIdxs) > 0 {
hrw.SortSliceByValue(dbIdxs, addressHash(&addr, path))
for _, dbIdx := range dbIdxs {
dbPath := filepath.Join(path, u64ToHexStringExt(dbIdx))
stop, err := f(dbPath)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
if len(dirIdxs) > 0 {
hrw.SortSliceByValue(dirIdxs, addressHash(&addr, path))
for _, dirIdx := range dirIdxs {
dirPath := filepath.Join(path, u64ToHexString(dirIdx))
stop, err := b.iterateSordedDBPathsInternal(ctx, dirPath, addr, f)
if err != nil {
return false, err
}
if stop {
return true, nil
}
}
}
return false, nil
}
// makes slice of uint64 values from 0 to number-1.

View file

@ -0,0 +1,41 @@
package blobovniczatree
import (
"context"
"testing"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func TestIterateSortedLeavesAndDBPathsAreSame(t *testing.T) {
t.Parallel()
blz := NewBlobovniczaTree(
WithBlobovniczaShallowDepth(3),
WithBlobovniczaShallowWidth(5),
WithRootPath(t.TempDir()),
)
require.NoError(t, blz.Open(false))
require.NoError(t, blz.Init())
defer func() {
require.NoError(t, blz.Close())
}()
addr := oidtest.Address()
var leaves []string
var dbPaths []string
blz.iterateSortedLeaves(context.Background(), &addr, func(s string) (bool, error) {
leaves = append(leaves, s)
return false, nil
})
blz.iterateSortedDBPaths(context.Background(), addr, func(s string) (bool, error) {
dbPaths = append(dbPaths, s)
return false, nil
})
require.Equal(t, leaves, dbPaths)
}

View file

@ -107,25 +107,65 @@ func (b *sharedDB) Path() string {
// levelDbManager stores pointers of the sharedDB's for the leaf directory of the blobovnicza tree.
type levelDbManager struct {
databases []*sharedDB
dbMtx *sync.RWMutex
databases map[uint64]*sharedDB
options []blobovnicza.Option
path string
readOnly bool
metrics blobovnicza.Metrics
openDBCounter *openDBCounter
closedFlag *atomic.Bool
log *logger.Logger
}
func newLevelDBManager(width uint64, options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlog *atomic.Bool, log *logger.Logger,
func newLevelDBManager(options []blobovnicza.Option, rootPath string, lvlPath string,
readOnly bool, metrics blobovnicza.Metrics, openDBCounter *openDBCounter, closedFlag *atomic.Bool, log *logger.Logger,
) *levelDbManager {
result := &levelDbManager{
databases: make([]*sharedDB, width),
}
for idx := uint64(0); idx < width; idx++ {
result.databases[idx] = newSharedDB(options, filepath.Join(rootPath, lvlPath, u64ToHexStringExt(idx)), readOnly, metrics, openDBCounter, closedFlog, log)
databases: make(map[uint64]*sharedDB),
dbMtx: &sync.RWMutex{},
options: options,
path: filepath.Join(rootPath, lvlPath),
readOnly: readOnly,
metrics: metrics,
openDBCounter: openDBCounter,
closedFlag: closedFlag,
log: log,
}
return result
}
func (m *levelDbManager) GetByIndex(idx uint64) *sharedDB {
res := m.getDBIfExists(idx)
if res != nil {
return res
}
return m.getOrCreateDB(idx)
}
func (m *levelDbManager) getDBIfExists(idx uint64) *sharedDB {
m.dbMtx.RLock()
defer m.dbMtx.RUnlock()
return m.databases[idx]
}
func (m *levelDbManager) getOrCreateDB(idx uint64) *sharedDB {
m.dbMtx.Lock()
defer m.dbMtx.Unlock()
db := m.databases[idx]
if db != nil {
return db
}
db = newSharedDB(m.options, filepath.Join(m.path, u64ToHexStringExt(idx)), m.readOnly, m.metrics, m.openDBCounter, m.closedFlag, m.log)
m.databases[idx] = db
return db
}
// dbManager manages the opening and closing of blobovnicza instances.
//
// The blobovnicza opens at the first request, closes after the last request.
@ -135,21 +175,19 @@ type dbManager struct {
closedFlag *atomic.Bool
dbCounter *openDBCounter
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
leafWidth uint64
log *logger.Logger
rootPath string
options []blobovnicza.Option
readOnly bool
metrics blobovnicza.Metrics
log *logger.Logger
}
func newDBManager(rootPath string, options []blobovnicza.Option, leafWidth uint64, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
func newDBManager(rootPath string, options []blobovnicza.Option, readOnly bool, metrics blobovnicza.Metrics, log *logger.Logger) *dbManager {
return &dbManager{
rootPath: rootPath,
options: options,
readOnly: readOnly,
metrics: metrics,
leafWidth: leafWidth,
levelToManager: make(map[string]*levelDbManager),
levelToManagerGuard: &sync.RWMutex{},
log: log,
@ -197,7 +235,7 @@ func (m *dbManager) getOrCreateLevelManager(lvlPath string) *levelDbManager {
return result
}
result := newLevelDBManager(m.leafWidth, m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
result := newLevelDBManager(m.options, m.rootPath, lvlPath, m.readOnly, m.metrics, m.dbCounter, m.closedFlag, m.log)
m.levelToManager[lvlPath] = result
return result
}