[#9999] blobovniczatree: Add rebuild by fill percent

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-08-29 11:34:18 +03:00
parent c89c137281
commit c2814afbe6
9 changed files with 468 additions and 119 deletions

View file

@ -57,3 +57,7 @@ func (b *Blobovnicza) itemDeleted(itemSize uint64) {
func (b *Blobovnicza) IsFull() bool {
return b.dataSize.Load() >= b.fullSizeLimit
}
func (b *Blobovnicza) FillPercent() int {
return int(100.0 * (float64(b.dataSize.Load()) / float64(b.fullSizeLimit)))
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
@ -59,7 +60,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx)
dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.Action)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false
@ -93,7 +94,33 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, action common.RebuildAction) ([]string, error) {
schemaChange := make(map[string]struct{})
fillPercent := make(map[string]struct{})
var err error
if action.SchemaChange {
schemaChange, err = b.selectDBsDoNotMatchSchema(ctx)
if err != nil {
return nil, err
}
}
if action.FillPercent {
fillPercent, err = b.selectDBsDoNotMatchFillPercent(ctx, action.FillPercentValue)
if err != nil {
return nil, err
}
}
for k := range fillPercent {
schemaChange[k] = struct{}{}
}
result := make([]string, 0, len(schemaChange))
for db := range schemaChange {
result = append(result, db)
}
return result, nil
}
func (b *Blobovniczas) selectDBsDoNotMatchSchema(ctx context.Context) (map[string]struct{}, error) {
dbsToMigrate := make(map[string]struct{})
if err := b.iterateExistingDBPaths(ctx, func(s string) (bool, error) {
dbsToMigrate[s] = struct{}{}
@ -107,13 +134,69 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
}); err != nil {
return nil, err
}
result := make([]string, 0, len(dbsToMigrate))
for db := range dbsToMigrate {
result = append(result, db)
return dbsToMigrate, nil
}
func (b *Blobovniczas) selectDBsDoNotMatchFillPercent(ctx context.Context, target int) (map[string]struct{}, error) {
if target <= 0 || target > 100 {
return nil, fmt.Errorf("invalid fill percent value %d: must be (0; 100]", target)
}
result := make(map[string]struct{})
if err := b.iterateDeepest(ctx, oid.Address{}, func(lvlPath string) (bool, error) {
dir := filepath.Join(b.rootPath, lvlPath)
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) { // non initialized tree
return false, nil
}
if err != nil {
return false, err
}
hasDBs := false
// db with maxIdx could be an active, so it should not be rebuilded
var maxIdx uint64
for _, e := range entries {
if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) {
continue
}
hasDBs = true
maxIdx = max(u64FromHexString(e.Name()), maxIdx)
}
if !hasDBs {
return false, nil
}
for _, e := range entries {
if e.IsDir() || strings.HasSuffix(e.Name(), rebuildSuffix) {
continue
}
if u64FromHexString(e.Name()) == maxIdx {
continue
}
path := filepath.Join(lvlPath, e.Name())
resettlementRequired, err := b.fillPercentIsLow(path, target)
if err != nil {
return false, err
}
if resettlementRequired {
result[path] = struct{}{}
}
}
return false, nil
}); err != nil {
return nil, err
}
return result, nil
}
func (b *Blobovniczas) fillPercentIsLow(path string, target int) (bool, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
return false, err
}
defer shDB.Close()
return blz.FillPercent() < target, nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()

View file

@ -15,7 +15,7 @@ import (
"golang.org/x/sync/errgroup"
)
func TestBlobovniczaTreeRebuild(t *testing.T) {
func TestBlobovniczaTreeSchemaRebuild(t *testing.T) {
t.Parallel()
t.Run("width increased", func(t *testing.T) {
@ -39,6 +39,197 @@ func TestBlobovniczaTreeRebuild(t *testing.T) {
})
}
func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
t.Parallel()
t.Run("no rebuild by fill percent", func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
b := NewBlobovniczaTree(
context.Background(),
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(64*1024),
WithBlobovniczaShallowWidth(1), // single directory
WithBlobovniczaShallowDepth(1),
WithRootPath(dir),
WithBlobovniczaSize(100*1024), // 100 KB limit for each blobovnicza
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(mode.ComponentReadWrite))
require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ {
obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(context.Background(), prm)
require.NoError(t, err)
storageIDs[prm.Address] = res.StorageID
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{
SchemaChange: false,
FillPercent: true,
FillPercentValue: 60,
},
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.False(t, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
})
t.Run("no rebuild single db", func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
b := NewBlobovniczaTree(
context.Background(),
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(64*1024),
WithBlobovniczaShallowWidth(1), // single directory
WithBlobovniczaShallowDepth(1),
WithRootPath(dir),
WithBlobovniczaSize(100*1024), // 100 KB soft limit for each blobovnicza
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(mode.ComponentReadWrite))
require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte)
obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(context.Background(), prm)
require.NoError(t, err)
storageIDs[prm.Address] = res.StorageID
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{
SchemaChange: false,
FillPercent: true,
FillPercentValue: 90, // 64KB / 100KB = 64%
},
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.False(t, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
})
t.Run("rebuild by fill percent", func(t *testing.T) {
t.Parallel()
dir := t.TempDir()
b := NewBlobovniczaTree(
context.Background(),
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(64*1024),
WithBlobovniczaShallowWidth(1), // single directory
WithBlobovniczaShallowDepth(1),
WithRootPath(dir),
WithBlobovniczaSize(100*1024), // 100 KB limit for each blobovnicza
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(mode.ComponentReadWrite))
require.NoError(t, b.Init())
storageIDs := make(map[oid.Address][]byte)
toDelete := make(map[oid.Address][]byte)
for i := 0; i < 100; i++ { // 2 objects for one blobovnicza, so 50 DBs total will be created
obj := blobstortest.NewObject(64 * 1024)
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(context.Background(), prm)
require.NoError(t, err)
storageIDs[prm.Address] = res.StorageID
if i%2 == 1 {
toDelete[prm.Address] = res.StorageID
}
}
for addr, storageID := range toDelete {
var prm common.DeletePrm
prm.Address = addr
prm.StorageID = storageID
_, err := b.Delete(context.Background(), prm)
require.NoError(t, err)
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{
SchemaChange: false,
FillPercent: true,
FillPercentValue: 80,
},
})
require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved)
require.Equal(t, uint64(49), rRes.ObjectsMoved) // 49 DBs with 1 objects
require.Equal(t, uint64(49), metaStub.updatedCount)
for addr, storageID := range storageIDs {
if _, found := toDelete[addr]; found {
continue
}
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
})
}
func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
t.Parallel()

View file

@ -11,9 +11,17 @@ type RebuildRes struct {
FilesRemoved uint64
}
type RebuildAction struct {
SchemaChange bool
FillPercent bool
FillPercentValue int
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
Action RebuildAction
}
type MetaStorage interface {

View file

@ -18,13 +18,14 @@ type ConcurrentWorkersLimiter interface {
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, action common.RebuildAction) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
Action: action,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved

View file

@ -0,0 +1,173 @@
package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type rebuildLimiter struct {
semaphore chan struct{}
}
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}
type rebuildTask struct {
limiter RebuildWorkerLimiter
action common.RebuildAction
}
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
done chan struct{}
tasks chan rebuildTask
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
limiter: l,
tasks: make(chan rebuildTask, 10),
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.done != nil {
return // already started
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.done = make(chan struct{})
r.wg.Add(1)
go func() {
defer r.wg.Done()
for {
select {
case <-r.done:
return
case t, ok := <-r.tasks:
if !ok {
continue
}
runRebuild(ctx, bs, mb, log, t.action, t.limiter)
}
}
}()
select {
case <-ctx.Done():
return
case r.tasks <- rebuildTask{
limiter: r.limiter,
action: common.RebuildAction{
SchemaChange: true,
},
}:
return
}
}
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
action common.RebuildAction, limiter RebuildWorkerLimiter,
) {
select {
case <-ctx.Done():
return
default:
}
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, action); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, action common.RebuildAction,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case r.tasks <- rebuildTask{
limiter: limiter,
action: action,
}:
return nil
}
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.done != nil {
close(r.done)
}
if r.cancel != nil {
r.cancel()
}
r.wg.Wait()
r.cancel = nil
r.done = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}

View file

@ -1,13 +0,0 @@
package shard
import "context"
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type noopRebuildLimiter struct{}
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}

View file

@ -1,98 +0,0 @@
package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
cancel: nil,
limiter: l,
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.start(ctx, bs, mb, log)
}
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
if r.cancel != nil {
r.stop(log)
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}()
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.stop(log)
}
func (r *rebuilder) stop(log *logger.Logger) {
if r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}

View file

@ -147,7 +147,7 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
rebuildLimiter: newRebuildLimiter(1),
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
}