[#116] node: Improve shard/engine construction in tests
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful

* Introduce testEngineWrapper that can be constructed with different options

Signed-off-by: Airat Arifullin a.arifullin@yadro.com
This commit is contained in:
Airat Arifullin 2023-03-30 14:58:20 +03:00
parent a69c6d1ec9
commit 21f7aae37f
10 changed files with 173 additions and 144 deletions

View file

@ -192,7 +192,8 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
} }
func TestExecBlocks(t *testing.T) { func TestExecBlocks(t *testing.T) {
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many
t.Cleanup(func() { t.Cleanup(func() {
os.RemoveAll(t.Name()) os.RemoveAll(t.Name())
}) })
@ -314,25 +315,26 @@ func TestReload(t *testing.T) {
// engineWithShards creates engine with specified number of shards. Returns // engineWithShards creates engine with specified number of shards. Returns
// slice of paths to their metabase and the engine. // slice of paths to their metabase and the engine.
// TODO: #1776 unify engine construction in tests
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) { func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
addPath := filepath.Join(path, "add") addPath := filepath.Join(path, "add")
currShards := make([]string, 0, num) currShards := make([]string, 0, num)
e := New() te := testNewEngine(t).
for i := 0; i < num; i++ { setShardsNumAdditionalOpts(t, num, func(id int) []shard.Option {
id, err := e.AddShard( return []shard.Option{
shard.WithBlobStorOptions( shard.WithBlobStorOptions(
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))), blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(id)), errSmallSize))),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))), meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700), meta.WithPermissions(0700),
meta.WithEpochState(epochState{}), meta.WithEpochState(epochState{}),
), ),
) }
require.NoError(t, err) })
e, ids := te.engine, te.shardIDs
for _, id := range ids {
currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo())) currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo()))
} }

View file

@ -53,7 +53,7 @@ func TestDeleteBigObject(t *testing.T) {
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
s3 := testNewShard(t, 3) s3 := testNewShard(t, 3)
e := testNewEngineWithShards(t, s1, s2, s3) e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
e.log = &logger.Logger{Logger: zaptest.NewLogger(t)} e.log = &logger.Logger{Logger: zaptest.NewLogger(t)}
defer e.Close() defer e.Close()

View file

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest"
) )
type epochState struct{} type epochState struct{}
@ -50,7 +51,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
shards[i] = testNewShard(b, i) shards[i] = testNewShard(b, i)
} }
e := testNewEngineWithShards(b, shards...) e := testNewEngine(b).setInitializedShards(b, shards...).engine
b.Cleanup(func() { b.Cleanup(func() {
_ = e.Close() _ = e.Close()
_ = os.RemoveAll(b.Name()) _ = os.RemoveAll(b.Name())
@ -75,24 +76,68 @@ func benchmarkExists(b *testing.B, shardNum int) {
} }
} }
func testNewEngineWithShards(t testing.TB, shards ...*shard.Shard) *StorageEngine { type testEngineWrapper struct {
engine := New() engine *StorageEngine
shardIDs []*shard.ID
}
func testNewEngine(t testing.TB, opts ...Option) *testEngineWrapper {
engine := New(WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}))
for _, opt := range opts {
opt(engine.cfg)
}
return &testEngineWrapper{
engine: engine,
}
}
func (te *testEngineWrapper) setInitializedShards(t testing.TB, shards ...*shard.Shard) *testEngineWrapper {
for _, s := range shards { for _, s := range shards {
pool, err := ants.NewPool(10, ants.WithNonblocking(true)) pool, err := ants.NewPool(10, ants.WithNonblocking(true))
require.NoError(t, err) require.NoError(t, err)
engine.shards[s.ID().String()] = hashedShard{ te.engine.shards[s.ID().String()] = hashedShard{
shardWrapper: shardWrapper{ shardWrapper: shardWrapper{
errorCount: atomic.NewUint32(0), errorCount: atomic.NewUint32(0),
Shard: s, Shard: s,
}, },
hash: hrw.Hash([]byte(s.ID().String())), hash: hrw.Hash([]byte(s.ID().String())),
} }
engine.shardPools[s.ID().String()] = pool te.engine.shardPools[s.ID().String()] = pool
te.shardIDs = append(te.shardIDs, s.ID())
}
return te
}
func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrapper {
shards := make([]*shard.Shard, 0, num)
for i := 0; i < num; i++ {
shards = append(shards, testNewShard(t, i))
} }
return engine return te.setInitializedShards(t, shards...)
}
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
for i := 0; i < num; i++ {
opts := shardOpts(i)
id, err := te.engine.AddShard(opts...)
require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id)
}
return te
}
func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
for i := 0; i < num; i++ {
defaultOpts := testDefaultShardOptions(t, i)
opts := append(defaultOpts, shardOpts(i)...)
id, err := te.engine.AddShard(opts...)
require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id)
}
return te
} }
func newStorages(root string, smallSize uint64) []blobstor.SubStorage { func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
@ -145,8 +190,17 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
sid, err := generateShardID() sid, err := generateShardID()
require.NoError(t, err) require.NoError(t, err)
s := shard.New( shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
shard.WithID(sid), s := shard.New(shardOpts...)
require.NoError(t, s.Open())
require.NoError(t, s.Init(context.Background()))
return s
}
func testDefaultShardOptions(t testing.TB, id int) []shard.Option {
return []shard.Option{
shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithLogger(&logger.Logger{Logger: zap.L()}),
shard.WithBlobStorOptions( shard.WithBlobStorOptions(
blobstor.WithStorages( blobstor.WithStorages(
@ -157,46 +211,5 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))), meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0700), meta.WithPermissions(0700),
meta.WithEpochState(epochState{}), meta.WithEpochState(epochState{}),
)) )}
require.NoError(t, s.Open())
require.NoError(t, s.Init(context.Background()))
return s
}
func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *StorageEngine {
engine := New()
for i := 0; i < num; i++ {
_, err := engine.AddShard(append([]shard.Option{
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(filepath.Join(t.Name(), fmt.Sprintf("blobstor%d", i)),
1<<20)),
),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("metabase%d", i))),
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))),
}, extraOpts...)...)
require.NoError(t, err)
}
require.NoError(t, engine.Open())
require.NoError(t, engine.Init(context.Background()))
return engine
}
func testNewEngineWithShardNum(t *testing.T, num int) *StorageEngine {
shards := make([]*shard.Shard, 0, num)
for i := 0; i < num; i++ {
shards = append(shards, testNewShard(t, i))
}
return testNewEngineWithShards(t, shards...)
} }

View file

@ -48,37 +48,39 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
t.Cleanup(func() { _ = os.RemoveAll(dir) }) t.Cleanup(func() { _ = os.RemoveAll(dir) })
} }
e := New(
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
WithShardPoolSize(1),
WithErrorThreshold(errThreshold))
var testShards [2]*testShard var testShards [2]*testShard
for i := range testShards { te := testNewEngine(t,
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize) WithShardPoolSize(1),
id, err := e.AddShard( WithErrorThreshold(errThreshold),
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), ).
shard.WithBlobStorOptions(blobstor.WithStorages(storages)), setShardsNumOpts(t, 2, func(id int) []shard.Option {
shard.WithMetaBaseOptions( storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(id)), errSmallSize)
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), testShards[id] = &testShard{
meta.WithPermissions(0700), smallFileStorage: smallFileStorage,
meta.WithEpochState(epochState{}), largeFileStorage: largeFileStorage,
), }
shard.WithPiloramaOptions( return []shard.Option{
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))), shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
pilorama.WithPerm(0700))) shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
require.NoError(t, err) shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
testShards[i] = &testShard{ meta.WithPermissions(0700),
id: id, meta.WithEpochState(epochState{}),
smallFileStorage: smallFileStorage, ),
largeFileStorage: largeFileStorage, shard.WithPiloramaOptions(
} pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
} pilorama.WithPerm(0700)),
}
})
e := te.engine
require.NoError(t, e.Open()) require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))
for i, id := range te.shardIDs {
testShards[i].id = id
}
return &testEngine{ return &testEngine{
ng: e, ng: e,
dir: dir, dir: dir,

View file

@ -29,28 +29,23 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) }) t.Cleanup(func() { _ = os.RemoveAll(dir) })
e := New( te := testNewEngine(t, WithShardPoolSize(1)).
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
WithShardPoolSize(1)) return []shard.Option{
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
ids := make([]*shard.ID, shardNum) shard.WithBlobStorOptions(
blobstor.WithStorages([]blobstor.SubStorage{{
for i := range ids { Storage: fstree.New(
ids[i], err = e.AddShard( fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))),
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), fstree.WithDepth(1)),
shard.WithBlobStorOptions( }})),
blobstor.WithStorages([]blobstor.SubStorage{{ shard.WithMetaBaseOptions(
Storage: fstree.New( meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))), meta.WithPermissions(0700),
fstree.WithDepth(1)), meta.WithEpochState(epochState{})),
}})), }
shard.WithMetaBaseOptions( })
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), e, ids := te.engine, te.shardIDs
meta.WithPermissions(0700),
meta.WithEpochState(epochState{}),
))
require.NoError(t, err)
}
require.NoError(t, e.Open()) require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background())) require.NoError(t, e.Init(context.Background()))

View file

@ -44,7 +44,7 @@ func TestHeadRaw(t *testing.T) {
s1 := testNewShard(t, 1) s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
e := testNewEngineWithShards(t, s1, s2) e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close() defer e.Close()
var putPrmLeft shard.PutPrm var putPrmLeft shard.PutPrm

View file

@ -38,7 +38,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
link.SetSplitID(splitID) link.SetSplitID(splitID)
t.Run("delete small object", func(t *testing.T) { t.Run("delete small object", func(t *testing.T) {
e := testNewEngineWithShardNum(t, 1) e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close() defer e.Close()
err := Put(e, parent) err := Put(e, parent)
@ -59,7 +59,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
s1 := testNewShard(t, 1) s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
e := testNewEngineWithShards(t, s1, s2) e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close() defer e.Close()
var putChild shard.PutPrm var putChild shard.PutPrm

View file

@ -16,7 +16,7 @@ import (
func TestListWithCursor(t *testing.T) { func TestListWithCursor(t *testing.T) {
s1 := testNewShard(t, 1) s1 := testNewShard(t, 1)
s2 := testNewShard(t, 2) s2 := testNewShard(t, 2)
e := testNewEngineWithShards(t, s1, s2) e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
t.Cleanup(func() { t.Cleanup(func() {
e.Close() e.Close()

View file

@ -45,15 +45,21 @@ func TestLockUserScenario(t *testing.T) {
tombForLockID := oidtest.ID() tombForLockID := oidtest.ID()
tombObj.SetID(tombForLockID) tombObj.SetID(tombForLockID)
e := testEngineFromShardOpts(t, 2, []shard.Option{ testEngine := testNewEngine(t).
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
pool, err := ants.NewPool(sz) return []shard.Option{
require.NoError(t, err) shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool return pool
}), }),
shard.WithTombstoneSource(tss{lockerExpiresAfter}), shard.WithTombstoneSource(tss{lockerExpiresAfter}),
}) }
})
e := testEngine.engine
require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close()
@ -146,14 +152,20 @@ func TestLockExpiration(t *testing.T) {
// 3. lock expiration epoch is coming // 3. lock expiration epoch is coming
// 4. after some delay the object is not locked anymore // 4. after some delay the object is not locked anymore
e := testEngineFromShardOpts(t, 2, []shard.Option{ testEngine := testNewEngine(t).
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
pool, err := ants.NewPool(sz) return []shard.Option{
require.NoError(t, err) shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
require.NoError(t, err)
return pool return pool
}), }),
}) }
})
e := testEngine.engine
require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close()
@ -218,16 +230,20 @@ func TestLockForceRemoval(t *testing.T) {
// 5. the object is not locked anymore // 5. the object is not locked anymore
var e *StorageEngine var e *StorageEngine
e = testEngineFromShardOpts(t, 2, []shard.Option{ e = testNewEngine(t).
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
pool, err := ants.NewPool(sz) return []shard.Option{
require.NoError(t, err) shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
return pool require.NoError(t, err)
}),
shard.WithDeletedLockCallback(e.processDeletedLocks),
})
return pool
}),
shard.WithDeletedLockCallback(e.processDeletedLocks),
}
}).engine
require.NoError(t, e.Open())
require.NoError(t, e.Init(context.Background()))
t.Cleanup(func() { t.Cleanup(func() {
_ = e.Close() _ = e.Close()
_ = os.RemoveAll(t.Name()) _ = os.RemoveAll(t.Name())

View file

@ -10,7 +10,8 @@ import (
func TestRemoveShard(t *testing.T) { func TestRemoveShard(t *testing.T) {
const numOfShards = 6 const numOfShards = 6
e := testNewEngineWithShardNum(t, numOfShards) te := testNewEngine(t).setShardsNum(t, numOfShards)
e, ids := te.engine, te.shardIDs
t.Cleanup(func() { t.Cleanup(func() {
e.Close() e.Close()
os.RemoveAll(t.Name()) os.RemoveAll(t.Name())
@ -22,12 +23,12 @@ func TestRemoveShard(t *testing.T) {
removedNum := numOfShards / 2 removedNum := numOfShards / 2
mSh := make(map[string]bool, numOfShards) mSh := make(map[string]bool, numOfShards)
for i, sh := range e.DumpInfo().Shards { for i, id := range ids {
if i == removedNum { if i == removedNum {
break break
} }
mSh[sh.ID.String()] = true mSh[id.String()] = true
} }
for id, remove := range mSh { for id, remove := range mSh {