blobovnicza: Do not fail rebuild on objects larger than object size #1020
3 changed files with 84 additions and 1 deletions
|
@ -17,6 +17,8 @@ type PutPrm struct {
|
|||
addr oid.Address
|
||||
|
||||
objData []byte
|
||||
|
||||
force bool
|
||||
}
|
||||
|
||||
// PutRes groups the resulting values of Put operation.
|
||||
|
@ -32,6 +34,11 @@ func (p *PutPrm) SetMarshaledObject(data []byte) {
|
|||
p.objData = data
|
||||
}
|
||||
|
||||
// SetForce sets force option.
|
||||
func (p *PutPrm) SetForce(f bool) {
|
||||
p.force = f
|
||||
}
|
||||
|
||||
// Put saves an object in Blobovnicza.
|
||||
//
|
||||
// If binary representation of the object is not set,
|
||||
|
@ -66,7 +73,15 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) {
|
|||
// expected to happen:
|
||||
// - before initialization step (incorrect usage by design)
|
||||
// - if DB is corrupted (in future this case should be handled)
|
||||
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
||||
// - blobovnicza's object size changed before rebuild (handled if prm.force flag specified)
|
||||
if !prm.force {
|
||||
return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz))
|
||||
}
|
||||
var err error
|
||||
buck, err = tx.CreateBucket(bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err)
|
||||
}
|
||||
}
|
||||
|
||||
// save the object in bucket
|
||||
|
|
|
@ -402,6 +402,7 @@ func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool,
|
|||
var putPrm blobovnicza.PutPrm
|
||||
putPrm.SetAddress(i.Address)
|
||||
putPrm.SetMarshaledObject(i.ObjectData)
|
||||
putPrm.SetForce(true)
|
||||
|
||||
_, err = target.Blobovnicza().Put(ctx, putPrm)
|
||||
if err != nil {
|
||||
|
|
|
@ -38,6 +38,73 @@ func TestBlobovniczaTreeRebuild(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(64*1024), // 64KB object size limit
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
WithBlobovniczaShallowDepth(2), // depth = 2
|
||||
WithRootPath(dir),
|
||||
WithBlobovniczaSize(100*1024),
|
||||
WithWaitBeforeDropDB(0),
|
||||
WithOpenedCacheSize(1000),
|
||||
WithMoveBatchSize(3))
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
obj := blobstortest.NewObject(64 * 1024) // 64KB object
|
||||
data, err := obj.Marshal()
|
||||
require.NoError(t, err)
|
||||
var prm common.PutPrm
|
||||
prm.Address = object.AddressOf(obj)
|
||||
prm.RawData = data
|
||||
res, err := b.Put(context.Background(), prm)
|
||||
require.NoError(t, err)
|
||||
|
||||
storageIDs := make(map[oid.Address][]byte)
|
||||
storageIDs[prm.Address] = res.StorageID
|
||||
|
||||
require.NoError(t, b.Close())
|
||||
|
||||
b = NewBlobovniczaTree(
|
||||
WithLogger(test.NewLogger(t)),
|
||||
WithObjectSizeLimit(32*1024), // 32KB object size limit
|
||||
WithBlobovniczaShallowWidth(5),
|
||||
WithBlobovniczaShallowDepth(3), // depth = 3
|
||||
WithRootPath(dir),
|
||||
WithBlobovniczaSize(100*1024),
|
||||
WithWaitBeforeDropDB(0),
|
||||
WithOpenedCacheSize(1000),
|
||||
WithMoveBatchSize(3))
|
||||
require.NoError(t, b.Open(false))
|
||||
require.NoError(t, b.Init())
|
||||
|
||||
metaStub := &storageIDUpdateStub{
|
||||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
var rPrm common.RebuildPrm
|
||||
rPrm.MetaStorage = metaStub
|
||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||
require.NoError(t, err)
|
||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||
require.True(t, dataMigrated)
|
||||
|
||||
for addr, storageID := range storageIDs {
|
||||
var gPrm common.GetPrm
|
||||
gPrm.Address = addr
|
||||
gPrm.StorageID = storageID
|
||||
_, err := b.Get(context.Background(), gPrm)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, b.Close())
|
||||
}
|
||||
|
||||
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
||||
dir := t.TempDir()
|
||||
b := NewBlobovniczaTree(
|
||||
|
|
Loading…
Reference in a new issue