forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
225fe2d4d5
Down from 3s to 300ms. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
145 lines
3.9 KiB
Go
145 lines
3.9 KiB
Go
package blobovniczatree
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"testing"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func TestBlobovniczaTreeRebuild(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
t.Run("width increased", func(t *testing.T) {
|
|
t.Parallel()
|
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
|
|
})
|
|
|
|
t.Run("width reduced", func(t *testing.T) {
|
|
t.Parallel()
|
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
|
|
})
|
|
|
|
t.Run("depth increased", func(t *testing.T) {
|
|
t.Parallel()
|
|
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
|
|
})
|
|
|
|
t.Run("depth reduced", func(t *testing.T) {
|
|
t.Parallel()
|
|
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
|
|
})
|
|
}
|
|
|
|
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
|
|
dir := t.TempDir()
|
|
b := NewBlobovniczaTree(
|
|
WithLogger(test.NewLogger(t, true)),
|
|
WithObjectSizeLimit(2048),
|
|
WithBlobovniczaShallowWidth(sourceWidth),
|
|
WithBlobovniczaShallowDepth(sourceDepth),
|
|
WithRootPath(dir),
|
|
WithBlobovniczaSize(100*1024),
|
|
WithWaitBeforeDropDB(0),
|
|
WithOpenedCacheSize(1000),
|
|
WithMoveBatchSize(3))
|
|
require.NoError(t, b.Open(false))
|
|
require.NoError(t, b.Init())
|
|
|
|
eg, egCtx := errgroup.WithContext(context.Background())
|
|
storageIDs := make(map[oid.Address][]byte)
|
|
storageIDsGuard := &sync.Mutex{}
|
|
for i := 0; i < 100; i++ {
|
|
eg.Go(func() error {
|
|
obj := blobstortest.NewObject(1024)
|
|
data, err := obj.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var prm common.PutPrm
|
|
prm.Address = object.AddressOf(obj)
|
|
prm.RawData = data
|
|
res, err := b.Put(egCtx, prm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
storageIDsGuard.Lock()
|
|
storageIDs[prm.Address] = res.StorageID
|
|
storageIDsGuard.Unlock()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
require.NoError(t, eg.Wait())
|
|
require.NoError(t, b.Close())
|
|
|
|
b = NewBlobovniczaTree(
|
|
WithLogger(test.NewLogger(t, true)),
|
|
WithObjectSizeLimit(2048),
|
|
WithBlobovniczaShallowWidth(targetWidth),
|
|
WithBlobovniczaShallowDepth(targetDepth),
|
|
WithRootPath(dir),
|
|
WithBlobovniczaSize(100*1024),
|
|
WithWaitBeforeDropDB(0),
|
|
WithOpenedCacheSize(1000),
|
|
WithMoveBatchSize(50))
|
|
require.NoError(t, b.Open(false))
|
|
require.NoError(t, b.Init())
|
|
|
|
for addr, storageID := range storageIDs {
|
|
var gPrm common.GetPrm
|
|
gPrm.Address = addr
|
|
gPrm.StorageID = storageID
|
|
_, err := b.Get(context.Background(), gPrm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
metaStub := &storageIDUpdateStub{
|
|
storageIDs: storageIDs,
|
|
guard: &sync.Mutex{},
|
|
}
|
|
var rPrm common.RebuildPrm
|
|
rPrm.MetaStorage = metaStub
|
|
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
|
rRes, err := b.Rebuild(context.Background(), rPrm)
|
|
require.NoError(t, err)
|
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
|
require.Equal(t, shouldMigrate, dataMigrated)
|
|
|
|
for addr, storageID := range storageIDs {
|
|
var gPrm common.GetPrm
|
|
gPrm.Address = addr
|
|
gPrm.StorageID = storageID
|
|
_, err := b.Get(context.Background(), gPrm)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
require.NoError(t, b.Close())
|
|
}
|
|
|
|
type storageIDUpdateStub struct {
|
|
guard *sync.Mutex
|
|
storageIDs map[oid.Address][]byte
|
|
updatedCount uint64
|
|
}
|
|
|
|
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
|
s.guard.Lock()
|
|
defer s.guard.Unlock()
|
|
|
|
s.storageIDs[addr] = storageID
|
|
s.updatedCount++
|
|
return nil
|
|
}
|
|
|
|
type rebuildLimiterStub struct{}
|
|
|
|
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
|
|
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}
|