frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go

146 lines
3.9 KiB
Go

package blobovniczatree
import (
"context"
"sync"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestBlobovniczaTreeRebuild(t *testing.T) {
t.Parallel()
t.Run("width increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 3, false)
})
t.Run("width reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 2, 1, true)
})
t.Run("depth increased", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 1, 2, 2, 2, true)
})
t.Run("depth reduced", func(t *testing.T) {
t.Parallel()
testBlobovniczaTreeRebuildHelper(t, 2, 2, 1, 2, true)
})
}
func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, targetDepth, targetWidth uint64, shouldMigrate bool) {
dir := t.TempDir()
b := NewBlobovniczaTree(
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(sourceWidth),
WithBlobovniczaShallowDepth(sourceDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
eg, egCtx := errgroup.WithContext(context.Background())
storageIDs := make(map[oid.Address][]byte)
storageIDsGuard := &sync.Mutex{}
for i := 0; i < 100; i++ {
eg.Go(func() error {
obj := blobstortest.NewObject(1024)
data, err := obj.Marshal()
if err != nil {
return err
}
var prm common.PutPrm
prm.Address = object.AddressOf(obj)
prm.RawData = data
res, err := b.Put(egCtx, prm)
if err != nil {
return err
}
storageIDsGuard.Lock()
storageIDs[prm.Address] = res.StorageID
storageIDsGuard.Unlock()
return nil
})
}
require.NoError(t, eg.Wait())
require.NoError(t, b.Close())
b = NewBlobovniczaTree(
WithLogger(test.NewLogger(t)),
WithObjectSizeLimit(2048),
WithBlobovniczaShallowWidth(targetWidth),
WithBlobovniczaShallowDepth(targetDepth),
WithRootPath(dir),
WithBlobovniczaSize(100*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000),
WithMoveBatchSize(50))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
metaStub := &storageIDUpdateStub{
storageIDs: storageIDs,
guard: &sync.Mutex{},
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
require.Equal(t, shouldMigrate, dataMigrated)
for addr, storageID := range storageIDs {
var gPrm common.GetPrm
gPrm.Address = addr
gPrm.StorageID = storageID
_, err := b.Get(context.Background(), gPrm)
require.NoError(t, err)
}
require.NoError(t, b.Close())
}
type storageIDUpdateStub struct {
guard *sync.Mutex
storageIDs map[oid.Address][]byte
updatedCount uint64
}
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
s.guard.Lock()
defer s.guard.Unlock()
s.storageIDs[addr] = storageID
s.updatedCount++
return nil
}
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}