diff --git a/pkg/local_object_storage/blobovnicza/put.go b/pkg/local_object_storage/blobovnicza/put.go index d15b6e2b..9af8a93f 100644 --- a/pkg/local_object_storage/blobovnicza/put.go +++ b/pkg/local_object_storage/blobovnicza/put.go @@ -17,6 +17,8 @@ type PutPrm struct { addr oid.Address objData []byte + + force bool } // PutRes groups the resulting values of Put operation. @@ -32,6 +34,11 @@ func (p *PutPrm) SetMarshaledObject(data []byte) { p.objData = data } +// SetForce sets force option. +func (p *PutPrm) SetForce(f bool) { + p.force = f +} + // Put saves an object in Blobovnicza. // // If binary representation of the object is not set, @@ -66,7 +73,15 @@ func (b *Blobovnicza) Put(ctx context.Context, prm PutPrm) (PutRes, error) { // expected to happen: // - before initialization step (incorrect usage by design) // - if DB is corrupted (in future this case should be handled) - return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz)) + // - blobovnicza's object size changed before rebuild (handled if prm.force flag specified) + if !prm.force { + return logicerr.Wrap(fmt.Errorf("(%T) bucket for size %d not created", b, sz)) + } + var err error + buck, err = tx.CreateBucket(bucketName) + if err != nil { + return fmt.Errorf("(%T) failed to create bucket for size %d: %w", b, sz, err) + } } // save the object in bucket diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index 2302661d..93ef8ba2 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -402,6 +402,7 @@ func (i *moveIterator) tryMoveToLvl(ctx context.Context, lvlPath string) (bool, var putPrm blobovnicza.PutPrm putPrm.SetAddress(i.Address) putPrm.SetMarshaledObject(i.ObjectData) + putPrm.SetForce(true) _, err = target.Blobovnicza().Put(ctx, putPrm) if err != nil { diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index 0ac50f36..578cb1a7 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -38,6 +38,73 @@ func TestBlobovniczaTreeRebuild(t *testing.T) { }) } +func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + b := NewBlobovniczaTree( + WithLogger(test.NewLogger(t)), + WithObjectSizeLimit(64*1024), // 64KB object size limit + WithBlobovniczaShallowWidth(5), + WithBlobovniczaShallowDepth(2), // depth = 2 + WithRootPath(dir), + WithBlobovniczaSize(100*1024), + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) + require.NoError(t, b.Open(false)) + require.NoError(t, b.Init()) + + obj := blobstortest.NewObject(64 * 1024) // 64KB object + data, err := obj.Marshal() + require.NoError(t, err) + var prm common.PutPrm + prm.Address = object.AddressOf(obj) + prm.RawData = data + res, err := b.Put(context.Background(), prm) + require.NoError(t, err) + + storageIDs := make(map[oid.Address][]byte) + storageIDs[prm.Address] = res.StorageID + + require.NoError(t, b.Close()) + + b = NewBlobovniczaTree( + WithLogger(test.NewLogger(t)), + WithObjectSizeLimit(32*1024), // 32KB object size limit + WithBlobovniczaShallowWidth(5), + WithBlobovniczaShallowDepth(3), // depth = 3 + WithRootPath(dir), + WithBlobovniczaSize(100*1024), + WithWaitBeforeDropDB(0), + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) + require.NoError(t, b.Open(false)) + require.NoError(t, b.Init()) + + metaStub := &storageIDUpdateStub{ + storageIDs: storageIDs, + guard: &sync.Mutex{}, + } + var rPrm common.RebuildPrm + rPrm.MetaStorage = metaStub + rPrm.WorkerLimiter = &rebuildLimiterStub{} + rRes, err := b.Rebuild(context.Background(), rPrm) + require.NoError(t, err) + dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 + require.True(t, dataMigrated) + + for addr, storageID := range storageIDs { + var gPrm common.GetPrm + gPrm.Address = addr + gPrm.StorageID = storageID + _, err := b.Get(context.Background(), gPrm) + require.NoError(t, err) + } + + require.NoError(t, b.Close()) +} + func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) { dir := t.TempDir() b := NewBlobovniczaTree(