blobovnicza: Do not fail rebuild on objects larger than object size #1020

Merged
fyrchik merged 1 commit from dstepanov-yadro/frostfs-node:fix/rebuild_big_objects into master 2024-04-09 11:51:20 +00:00
3 changed files with 84 additions and 1 deletions

View file

@ -17,6 +17,8 @@ type PutPrm struct {
addr oid.Address
objData []byte
force bool
}
// PutRes groups the resulting values of Put operation.
@ -32,6 +34,11 @@ func (p *PutPrm) SetMarshaledObject(data []byte) {
p.objData = data
}
// SetForce sets force option.
func (p *PutPrm) SetForce(f bool) {
p.force = f
}
// Put saves an object in Blobovnicza.
//
// If binary representation of the object is not set,
@ -66,7 +73,15 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
// expected to happen:
// - before initialization step (incorrect usage by design)
// - if DB is corrupted (in future this case should be handled)
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
// - blobovnicza's object size changed before rebuild (handled if prm.force flag specified)
if !prm.force {
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
}
var err error
buck, err = tx.CreateBucket(bucketName)
if err != nil {
return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err)
}
}
// save the object in bucket

View file

@ -402,6 +402,7 @@ func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool,
var putPrm blobovnicza.PutPrm
putPrm.SetAddress(i.Address)
putPrm.SetMarshaledObject(i.ObjectData)
putPrm.SetForce(true)
_, err = target.Blobovnicza().Put(ctx, putPrm)
if err != nil {

View file

@ -38,6 +38,73 @@ func TestBlobovniczaTreeRebuild(t *testing.T) {
})
}
func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
t.Parallel()
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(64*1024), // 64KB object size limit
WithBlobovniczaShallowWidth(5),
WithBlobovniczaShallowDepth(2), // depth = 2
WithRootPath(dir),
WithBlobovniczaSize(100*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
obj := blobstortest.NewObject(64 * 1024) // 64KB object
data, err := obj.Marshal()
require.NoError(t, err)
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(context.Background(), prm)
require.NoError(t, err)
storageIDs := make(map[oid.Address][]byte)
storageIDs[prm.Address] = res.StorageID
require.NoError(t, b.Close())
b = NewBlobovniczaTree(
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(32*1024), // 32KB object size limit
WithBlobovniczaShallowWidth(5),
WithBlobovniczaShallowDepth(3), // depth = 3
WithRootPath(dir),
WithBlobovniczaSize(100*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.True(t, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
dir := t.TempDir()
b := NewBlobovniczaTree(