[#116] node: Improve shard/engine construction in tests #190
|
@ -192,7 +192,8 @@ func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Optio
|
|||
}
|
||||
|
||||
func TestExecBlocks(t *testing.T) {
|
||||
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
|
||||
e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many
|
||||
|
||||
t.Cleanup(func() {
|
||||
os.RemoveAll(t.Name())
|
||||
})
|
||||
|
@ -314,25 +315,26 @@ func TestReload(t *testing.T) {
|
|||
|
||||
// engineWithShards creates engine with specified number of shards. Returns
|
||||
// slice of paths to their metabase and the engine.
|
||||
// TODO: #1776 unify engine construction in tests
|
||||
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
|
||||
addPath := filepath.Join(path, "add")
|
||||
|
||||
currShards := make([]string, 0, num)
|
||||
|
||||
e := New()
|
||||
for i := 0; i < num; i++ {
|
||||
id, err := e.AddShard(
|
||||
te := testNewEngine(t).
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
setShardsNumAdditionalOpts(t, num, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(i)), errSmallSize))),
|
||||
blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(id)), errSmallSize))),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", id))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
e, ids := te.engine, te.shardIDs
|
||||
|
||||
for _, id := range ids {
|
||||
currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo()))
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ func TestDeleteBigObject(t *testing.T) {
|
|||
s2 := testNewShard(t, 2)
|
||||
s3 := testNewShard(t, 3)
|
||||
|
||||
e := testNewEngineWithShards(t, s1, s2, s3)
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||
e.log = &logger.Logger{Logger: zaptest.NewLogger(t)}
|
||||
defer e.Close()
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
type epochState struct{}
|
||||
|
@ -50,7 +51,7 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
shards[i] = testNewShard(b, i)
|
||||
}
|
||||
|
||||
e := testNewEngineWithShards(b, shards...)
|
||||
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
||||
b.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = os.RemoveAll(b.Name())
|
||||
|
@ -75,24 +76,68 @@ func benchmarkExists(b *testing.B, shardNum int) {
|
|||
}
|
||||
}
|
||||
|
||||
func testNewEngineWithShards(t testing.TB, shards ...*shard.Shard) *StorageEngine {
|
||||
engine := New()
|
||||
type testEngineWrapper struct {
|
||||
engine *StorageEngine
|
||||
shardIDs []*shard.ID
|
||||
}
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We already have We already have `shards` in the engine itself, duplicating them here can lead to unexpected behaviour when writing some concurrent tests in future. Can we return them from a method? Or does this field has some special meaning?
aarifullin
commented
This rather has supportive meaning. It is needed to keep shardIDs in the same order as they are added to engine. Also this allows to not iterate over engine shards and get shardIDs immediatly This rather has supportive meaning. It is needed to keep shardIDs in the same order as they are added to engine. Also this allows to not iterate over engine shards and get shardIDs immediatly
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Feels unnecessary here, what value does it provide? Feels unnecessary here, what value does it provide?
aarifullin
commented
Removed it Removed it
|
||||
func testNewEngine(t testing.TB, opts ...Option) *testEngineWrapper {
|
||||
engine := New(WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}))
|
||||
for _, opt := range opts {
|
||||
opt(engine.cfg)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Not worth a method IMO, Not worth a method IMO, `.engine` is just as fine and `Engine()` makes me wondering how is it different from the `RunEngine()`
|
||||
}
|
||||
return &testEngineWrapper{
|
||||
engine: engine,
|
||||
}
|
||||
}
|
||||
|
||||
func (te *testEngineWrapper) setInitializedShards(t testing.TB, shards ...*shard.Shard) *testEngineWrapper {
|
||||
for _, s := range shards {
|
||||
pool, err := ants.NewPool(10, ants.WithNonblocking(true))
|
||||
require.NoError(t, err)
|
||||
|
||||
engine.shards[s.ID().String()] = hashedShard{
|
||||
te.engine.shards[s.ID().String()] = hashedShard{
|
||||
shardWrapper: shardWrapper{
|
||||
errorCount: atomic.NewUint32(0),
|
||||
Shard: s,
|
||||
},
|
||||
hash: hrw.Hash([]byte(s.ID().String())),
|
||||
}
|
||||
engine.shardPools[s.ID().String()] = pool
|
||||
te.engine.shardPools[s.ID().String()] = pool
|
||||
te.shardIDs = append(te.shardIDs, s.ID())
|
||||
}
|
||||
return te
|
||||
}
|
||||
|
||||
return engine
|
||||
func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrapper {
|
||||
shards := make([]*shard.Shard, 0, num)
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
shards = append(shards, testNewShard(t, i))
|
||||
}
|
||||
|
||||
return te.setInitializedShards(t, shards...)
|
||||
}
|
||||
|
||||
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||
for i := 0; i < num; i++ {
|
||||
opts := shardOpts(i)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
return te
|
||||
}
|
||||
|
||||
func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||
for i := 0; i < num; i++ {
|
||||
defaultOpts := testDefaultShardOptions(t, i)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
If we want our function to override defaults, If we want our function to override defaults, `defaultOpts` should come first.
|
||||
opts := append(defaultOpts, shardOpts(i)...)
|
||||
id, err := te.engine.AddShard(opts...)
|
||||
require.NoError(t, err)
|
||||
te.shardIDs = append(te.shardIDs, id)
|
||||
}
|
||||
return te
|
||||
}
|
||||
|
||||
func newStorages(root string, smallSize uint64) []blobstor.SubStorage {
|
||||
|
@ -145,8 +190,17 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
|
|||
sid, err := generateShardID()
|
||||
require.NoError(t, err)
|
||||
|
||||
s := shard.New(
|
||||
shard.WithID(sid),
|
||||
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
||||
s := shard.New(shardOpts...)
|
||||
|
||||
require.NoError(t, s.Open())
|
||||
require.NoError(t, s.Init(context.Background()))
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func testDefaultShardOptions(t testing.TB, id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithLogger(&logger.Logger{Logger: zap.L()}),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(
|
||||
|
@ -157,46 +211,5 @@ func testNewShard(t testing.TB, id int) *shard.Shard {
|
|||
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("%d.metabase", id))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
))
|
||||
|
||||
require.NoError(t, s.Open())
|
||||
require.NoError(t, s.Init(context.Background()))
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *StorageEngine {
|
||||
engine := New()
|
||||
for i := 0; i < num; i++ {
|
||||
_, err := engine.AddShard(append([]shard.Option{
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(
|
||||
newStorages(filepath.Join(t.Name(), fmt.Sprintf("blobstor%d", i)),
|
||||
1<<20)),
|
||||
),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(t.Name(), fmt.Sprintf("metabase%d", i))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
),
|
||||
shard.WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(t.Name(), fmt.Sprintf("pilorama%d", i)))),
|
||||
}, extraOpts...)...)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, engine.Open())
|
||||
require.NoError(t, engine.Init(context.Background()))
|
||||
|
||||
return engine
|
||||
}
|
||||
|
||||
func testNewEngineWithShardNum(t *testing.T, num int) *StorageEngine {
|
||||
shards := make([]*shard.Shard, 0, num)
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
shards = append(shards, testNewShard(t, i))
|
||||
}
|
||||
|
||||
return testNewEngineWithShards(t, shards...)
|
||||
)}
|
||||
}
|
||||
|
|
|
@ -48,37 +48,39 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
}
|
||||
|
||||
e := New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold))
|
||||
|
||||
var testShards [2]*testShard
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Can Can `testNewEngine` just accept `engine` opts as a varargs?
|
||||
for i := range testShards {
|
||||
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
|
||||
id, err := e.AddShard(
|
||||
te := testNewEngine(t,
|
||||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold),
|
||||
).
|
||||
setShardsNumOpts(t, 2, func(id int) []shard.Option {
|
||||
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(id)), errSmallSize)
|
||||
testShards[id] = &testShard{
|
||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
I would be glad to take any suggestion about I would be glad to take any suggestion about `testShards` initialization. I tried to use intoduced `WithShardsNumFromOpts` but after rebase I was obliged to apply this hack with `testShards`.
It's still possible to invoke `enginge.AddShard` out of `testNewEngine`
|
||||
smallFileStorage: smallFileStorage,
|
||||
largeFileStorage: largeFileStorage,
|
||||
}
|
||||
return []shard.Option{
|
||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
),
|
||||
shard.WithPiloramaOptions(
|
||||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
|
||||
pilorama.WithPerm(0700)))
|
||||
require.NoError(t, err)
|
||||
|
||||
testShards[i] = &testShard{
|
||||
id: id,
|
||||
smallFileStorage: smallFileStorage,
|
||||
largeFileStorage: largeFileStorage,
|
||||
}
|
||||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
|
||||
pilorama.WithPerm(0700)),
|
||||
}
|
||||
})
|
||||
e := te.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
for i, id := range te.shardIDs {
|
||||
testShards[i].id = id
|
||||
}
|
||||
|
||||
return &testEngine{
|
||||
ng: e,
|
||||
dir: dir,
|
||||
|
|
|
@ -29,28 +29,23 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
|||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
|
||||
e := New(
|
||||
WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
WithShardPoolSize(1))
|
||||
|
||||
ids := make([]*shard.ID, shardNum)
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
te := testNewEngine(t, WithShardPoolSize(1)).
|
||||
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages([]blobstor.SubStorage{{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
fstree.WithPath(filepath.Join(dir, strconv.Itoa(id))),
|
||||
fstree.WithDepth(1)),
|
||||
}})),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", id))),
|
||||
meta.WithPermissions(0700),
|
||||
meta.WithEpochState(epochState{}),
|
||||
))
|
||||
require.NoError(t, err)
|
||||
meta.WithEpochState(epochState{})),
|
||||
}
|
||||
})
|
||||
e, ids := te.engine, te.shardIDs
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestHeadRaw(t *testing.T) {
|
|||
s1 := testNewShard(t, 1)
|
||||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngineWithShards(t, s1, s2)
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
carpawell marked this conversation as resolved
Outdated
carpawell
commented
do we have a getter for do we have a getter for `engine` (do not see it)? if no, exported setters look useless to me cause no external package needs `testEngineWrapper` but also no external package can get nested `engine`
aarifullin
commented
Good point. I've made Good point. I've made `setXXX` methods internal
|
||||
defer e.Close()
|
||||
|
||||
var putPrmLeft shard.PutPrm
|
||||
|
|
|
@ -38,7 +38,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
link.SetSplitID(splitID)
|
||||
|
||||
t.Run("delete small object", func(t *testing.T) {
|
||||
e := testNewEngineWithShardNum(t, 1)
|
||||
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||
defer e.Close()
|
||||
|
||||
err := Put(e, parent)
|
||||
|
@ -59,7 +59,7 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
|||
s1 := testNewShard(t, 1)
|
||||
s2 := testNewShard(t, 2)
|
||||
|
||||
e := testNewEngineWithShards(t, s1, s2)
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
defer e.Close()
|
||||
|
||||
var putChild shard.PutPrm
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
func TestListWithCursor(t *testing.T) {
|
||||
s1 := testNewShard(t, 1)
|
||||
s2 := testNewShard(t, 2)
|
||||
e := testNewEngineWithShards(t, s1, s2)
|
||||
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
|
|
|
@ -45,7 +45,9 @@ func TestLockUserScenario(t *testing.T) {
|
|||
tombForLockID := oidtest.ID()
|
||||
tombObj.SetID(tombForLockID)
|
||||
|
||||
e := testEngineFromShardOpts(t, 2, []shard.Option{
|
||||
testEngine := testNewEngine(t).
|
||||
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
require.NoError(t, err)
|
||||
|
@ -53,7 +55,11 @@ func TestLockUserScenario(t *testing.T) {
|
|||
return pool
|
||||
}),
|
||||
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
|
||||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
|
@ -146,14 +152,20 @@ func TestLockExpiration(t *testing.T) {
|
|||
// 3. lock expiration epoch is coming
|
||||
// 4. after some delay the object is not locked anymore
|
||||
|
||||
e := testEngineFromShardOpts(t, 2, []shard.Option{
|
||||
testEngine := testNewEngine(t).
|
||||
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
require.NoError(t, err)
|
||||
|
||||
return pool
|
||||
}),
|
||||
}
|
||||
})
|
||||
e := testEngine.engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
|
@ -218,7 +230,9 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
// 5. the object is not locked anymore
|
||||
var e *StorageEngine
|
||||
|
||||
e = testEngineFromShardOpts(t, 2, []shard.Option{
|
||||
e = testNewEngine(t).
|
||||
setShardsNumAdditionalOpts(t, 2, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
require.NoError(t, err)
|
||||
|
@ -226,8 +240,10 @@ func TestLockForceRemoval(t *testing.T) {
|
|||
return pool
|
||||
}),
|
||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
})
|
||||
|
||||
}
|
||||
}).engine
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init(context.Background()))
|
||||
t.Cleanup(func() {
|
||||
_ = e.Close()
|
||||
_ = os.RemoveAll(t.Name())
|
||||
|
|
|
@ -10,7 +10,8 @@ import (
|
|||
func TestRemoveShard(t *testing.T) {
|
||||
const numOfShards = 6
|
||||
|
||||
e := testNewEngineWithShardNum(t, numOfShards)
|
||||
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||
e, ids := te.engine, te.shardIDs
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
os.RemoveAll(t.Name())
|
||||
|
@ -22,12 +23,12 @@ func TestRemoveShard(t *testing.T) {
|
|||
removedNum := numOfShards / 2
|
||||
|
||||
mSh := make(map[string]bool, numOfShards)
|
||||
for i, sh := range e.DumpInfo().Shards {
|
||||
for i, id := range ids {
|
||||
if i == removedNum {
|
||||
break
|
||||
}
|
||||
|
||||
mSh[sh.ID.String()] = true
|
||||
mSh[id.String()] = true
|
||||
}
|
||||
|
||||
for id, remove := range mSh {
|
||||
|
|
Maybe just
WithShardsAdditionalOpts(func(int) []shard.Option)
?