[#139] test: Add test storage implementation
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
This aims to reduce the usage of chmod hackery to induce or simulate OS-related failures. Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
parent
e843e7f090
commit
341fe1688f
20 changed files with 617 additions and 208 deletions
|
@ -9,6 +9,8 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
|
@ -24,7 +26,19 @@ import (
|
|||
|
||||
const errSmallSize = 256
|
||||
|
||||
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||
type testEngine struct {
|
||||
ng *StorageEngine
|
||||
dir string
|
||||
shards [2]*testShard
|
||||
}
|
||||
|
||||
type testShard struct {
|
||||
id *shard.ID
|
||||
smallFileStorage *teststore.TestStore
|
||||
largeFileStorage *teststore.TestStore
|
||||
}
|
||||
|
||||
func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) *testEngine {
|
||||
if dir == "" {
|
||||
var err error
|
||||
|
||||
|
@ -38,14 +52,13 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
WithShardPoolSize(1),
|
||||
WithErrorThreshold(errThreshold))
|
||||
|
||||
var ids [2]*shard.ID
|
||||
var err error
|
||||
var testShards [2]*testShard
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
for i := range testShards {
|
||||
storages, smallFileStorage, largeFileStorage := newTestStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize)
|
||||
id, err := e.AddShard(
|
||||
shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithStorages(newStorages(filepath.Join(dir, strconv.Itoa(i)), errSmallSize))),
|
||||
shard.WithBlobStorOptions(blobstor.WithStorages(storages)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700),
|
||||
|
@ -55,94 +68,111 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
|||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", i))),
|
||||
pilorama.WithPerm(0700)))
|
||||
require.NoError(t, err)
|
||||
|
||||
testShards[i] = &testShard{
|
||||
id: id,
|
||||
smallFileStorage: smallFileStorage,
|
||||
largeFileStorage: largeFileStorage,
|
||||
}
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
return e, dir, ids
|
||||
return &testEngine{
|
||||
ng: e,
|
||||
dir: dir,
|
||||
shards: testShards,
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrorReporting(t *testing.T) {
|
||||
t.Run("ignore errors by default", func(t *testing.T) {
|
||||
e, dir, id := newEngineWithErrorThreshold(t, "", 0)
|
||||
te := newEngineWithErrorThreshold(t, "", 0)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err := te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
for _, shard := range te.shards {
|
||||
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
|
||||
return common.GetRes{}, teststore.ErrDiskExploded
|
||||
}))
|
||||
}
|
||||
|
||||
for i := uint32(1); i < 3; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
})
|
||||
t.Run("with error threshold", func(t *testing.T) {
|
||||
const errThreshold = 3
|
||||
|
||||
e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
|
||||
te := newEngineWithErrorThreshold(t, "", errThreshold)
|
||||
|
||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||
obj.SetPayload(make([]byte, errSmallSize))
|
||||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err := e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err := te.ng.shards[te.shards[0].id.String()].Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.NoError(t, err)
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
|
||||
corruptSubDir(t, filepath.Join(dir, "0"))
|
||||
for _, shard := range te.shards {
|
||||
shard.largeFileStorage.SetOption(teststore.WithGet(func(common.GetPrm) (common.GetRes, error) {
|
||||
return common.GetRes{}, teststore.ErrDiskExploded
|
||||
}))
|
||||
}
|
||||
|
||||
for i := uint32(1); i < errThreshold; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], i, mode.ReadWrite)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, i, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
for i := uint32(0); i < 2; i++ {
|
||||
_, err = e.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
_, err = te.ng.Get(GetPrm{addr: object.AddressOf(obj)})
|
||||
require.Error(t, err)
|
||||
checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+i, mode.DegradedReadOnly)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, false))
|
||||
checkShardState(t, e, id[0], errThreshold+1, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, false))
|
||||
checkShardState(t, te.ng, te.shards[0].id, errThreshold+1, mode.ReadWrite)
|
||||
|
||||
require.NoError(t, e.SetShardMode(id[0], mode.ReadWrite, true))
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.SetShardMode(te.shards[0].id, mode.ReadWrite, true))
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
})
|
||||
}
|
||||
|
||||
// Issue #1186.
|
||||
func TestBlobstorFailback(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
te := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
objs := make([]*objectSDK.Object, 0, 2)
|
||||
for _, size := range []int{15, errSmallSize + 1} {
|
||||
|
@ -151,49 +181,49 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
|
||||
var prm shard.PutPrm
|
||||
prm.SetObject(obj)
|
||||
e.mtx.RLock()
|
||||
_, err = e.shards[id[0].String()].Shard.Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
te.ng.mtx.RLock()
|
||||
_, err = te.ng.shards[te.shards[0].id.String()].Shard.Put(prm)
|
||||
te.ng.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
objs = append(objs, obj)
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
_, err = e.Get(GetPrm{addr: addr})
|
||||
_, err = te.ng.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
_, err = e.GetRange(RngPrm{addr: addr})
|
||||
_, err = te.ng.GetRange(RngPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 0, mode.ReadWrite)
|
||||
require.NoError(t, e.Close())
|
||||
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
|
||||
require.NoError(t, te.ng.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p1 := te.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
p2 := te.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().BlobStorInfo.SubStorages[1].Path
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||
te = newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
for i := range objs {
|
||||
addr := object.AddressOf(objs[i])
|
||||
getRes, err := e.Get(GetPrm{addr: addr})
|
||||
getRes, err := te.ng.Get(GetPrm{addr: addr})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], getRes.Object())
|
||||
|
||||
rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||
rngRes, err := te.ng.GetRange(RngPrm{addr: addr, off: 1, ln: 10})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||
|
||||
_, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
_, err = te.ng.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1})
|
||||
require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{})
|
||||
}
|
||||
|
||||
checkShardState(t, e, id[0], 1, mode.DegradedReadOnly)
|
||||
checkShardState(t, e, id[1], 0, mode.ReadWrite)
|
||||
checkShardState(t, te.ng, te.shards[0].id, 1, mode.DegradedReadOnly)
|
||||
checkShardState(t, te.ng, te.shards[1].id, 0, mode.ReadWrite)
|
||||
}
|
||||
|
||||
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) {
|
||||
|
@ -204,19 +234,3 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
|||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
}
|
||||
|
||||
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
|
||||
func corruptSubDir(t *testing.T, dir string) {
|
||||
de, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
|
||||
// FIXME(@cthulhu-rider): copy-paste of unexported const from blobstor package, see #1407
|
||||
const dirBlobovnicza = "blobovnicza"
|
||||
|
||||
for i := range de {
|
||||
if de[i].IsDir() && de[i].Name() != dirBlobovnicza {
|
||||
require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue