blobovnicza: Do not fail rebuild on objects larger than object size #1020
3 changed files with 84 additions and 1 deletions
|
@ -17,6 +17,8 @@ type PutPrm struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
|
|
||||||
objData []byte
|
objData []byte
|
||||||
|
|
||||||
|
force bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutRes groups the resulting values of Put operation.
|
// PutRes groups the resulting values of Put operation.
|
||||||
|
@ -32,6 +34,11 @@ func (p *PutPrm) SetMarshaledObject(data []byte) {
|
||||||
p.objData = data
|
p.objData = data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetForce sets force option.
|
||||||
|
func (p *PutPrm) SetForce(f bool) {
|
||||||
|
p.force = f
|
||||||
|
}
|
||||||
|
|
||||||
// Put saves an object in Blobovnicza.
|
// Put saves an object in Blobovnicza.
|
||||||
//
|
//
|
||||||
// If binary representation of the object is not set,
|
// If binary representation of the object is not set,
|
||||||
|
@ -66,7 +73,15 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
||||||
// expected to happen:
|
// expected to happen:
|
||||||
// - before initialization step (incorrect usage by design)
|
// - before initialization step (incorrect usage by design)
|
||||||
// - if DB is corrupted (in future this case should be handled)
|
// - if DB is corrupted (in future this case should be handled)
|
||||||
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
// - blobovnicza's object size changed before rebuild (handled if prm.force flag specified)
|
||||||
|
if !prm.force {
|
||||||
|
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
buck, err = tx.CreateBucket(bucketName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// save the object in bucket
|
// save the object in bucket
|
||||||
|
|
|
@ -402,6 +402,7 @@ func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool,
|
||||||
var putPrm blobovnicza.PutPrm
|
var putPrm blobovnicza.PutPrm
|
||||||
putPrm.SetAddress(i.Address)
|
putPrm.SetAddress(i.Address)
|
||||||
putPrm.SetMarshaledObject(i.ObjectData)
|
putPrm.SetMarshaledObject(i.ObjectData)
|
||||||
|
putPrm.SetForce(true)
|
||||||
|
|
||||||
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -38,6 +38,73 @@ func TestBlobovniczaTreeRebuild(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
b := NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t)),
|
||||||
|
WithObjectSizeLimit(64*1024), // 64KB object size limit
|
||||||
|
WithBlobovniczaShallowWidth(5),
|
||||||
|
WithBlobovniczaShallowDepth(2), // depth = 2
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithBlobovniczaSize(100*1024),
|
||||||
|
WithWaitBeforeDropDB(0),
|
||||||
|
WithOpenedCacheSize(1000),
|
||||||
|
WithMoveBatchSize(3))
|
||||||
|
require.NoError(t, b.Open(false))
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
obj := blobstortest.NewObject(64 * 1024) // 64KB object
|
||||||
|
data, err := obj.Marshal()
|
||||||
|
require.NoError(t, err)
|
||||||
|
var prm common.PutPrm
|
||||||
|
prm.Address = object.AddressOf(obj)
|
||||||
|
prm.RawData = data
|
||||||
|
res, err := b.Put(context.Background(), prm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
storageIDs := make(map[oid.Address][]byte)
|
||||||
|
storageIDs[prm.Address] = res.StorageID
|
||||||
|
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
|
||||||
|
b = NewBlobovniczaTree(
|
||||||
|
WithLogger(test.NewLogger(t)),
|
||||||
|
WithObjectSizeLimit(32*1024), // 32KB object size limit
|
||||||
|
WithBlobovniczaShallowWidth(5),
|
||||||
|
WithBlobovniczaShallowDepth(3), // depth = 3
|
||||||
|
WithRootPath(dir),
|
||||||
|
WithBlobovniczaSize(100*1024),
|
||||||
|
WithWaitBeforeDropDB(0),
|
||||||
|
WithOpenedCacheSize(1000),
|
||||||
|
WithMoveBatchSize(3))
|
||||||
|
require.NoError(t, b.Open(false))
|
||||||
|
require.NoError(t, b.Init())
|
||||||
|
|
||||||
|
metaStub := &storageIDUpdateStub{
|
||||||
|
storageIDs: storageIDs,
|
||||||
|
guard: &sync.Mutex{},
|
||||||
|
}
|
||||||
|
var rPrm common.RebuildPrm
|
||||||
|
rPrm.MetaStorage = metaStub
|
||||||
|
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||||
|
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
require.True(t, dataMigrated)
|
||||||
|
|
||||||
|
for addr, storageID := range storageIDs {
|
||||||
|
var gPrm common.GetPrm
|
||||||
|
gPrm.Address = addr
|
||||||
|
gPrm.StorageID = storageID
|
||||||
|
_, err := b.Get(context.Background(), gPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, b.Close())
|
||||||
|
}
|
||||||
|
|
||||||
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
b := NewBlobovniczaTree(
|
b := NewBlobovniczaTree(
|
||||||
|
|
Loading…
Reference in a new issue