package engine import ( "context" "errors" "fmt" "io/fs" "os" "path/filepath" "strconv" "testing" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) // TestInitializationFailure checks that shard is initialized and closed even if media // under any single component is absent. func TestInitializationFailure(t *testing.T) { type openFileFunc func(string, int, fs.FileMode) (*os.File, error) type testShardOpts struct { openFileMetabase openFileFunc openFileWriteCache openFileFunc openFilePilorama openFileFunc } testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) { sid, err := generateShardID() require.NoError(t, err) tempDir := t.TempDir() blobstorPath := filepath.Join(tempDir, "bs") metabasePath := filepath.Join(tempDir, "mb") writecachePath := filepath.Join(tempDir, "wc") piloramaPath := filepath.Join(tempDir, "pl") storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20) return []shard.Option{ shard.WithID(sid), shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}), shard.WithBlobStorOptions( blobstor.WithStorages(storages)), shard.WithMetaBaseOptions( meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, OpenFile: opts.openFileMetabase, }), meta.WithPath(metabasePath), meta.WithPermissions(0700), meta.WithEpochState(epochState{})), shard.WithWriteCache(true), shard.WithWriteCacheOptions( writecache.WithPath(writecachePath), writecache.WithOpenFile(opts.openFileWriteCache), ), shard.WithPiloramaOptions( pilorama.WithPath(piloramaPath), pilorama.WithOpenFile(opts.openFilePilorama), ), }, smallFileStorage, largeFileStorage } t.Run("blobstor", func(t *testing.T) { shardOpts, _, largeFileStorage := testShard(testShardOpts{ openFileMetabase: os.OpenFile, openFileWriteCache: os.OpenFile, openFilePilorama: os.OpenFile, }) largeFileStorage.SetOption(teststore.WithOpen(func(ro bool) error { return teststore.ErrDiskExploded })) beforeReload := func() { largeFileStorage.SetOption(teststore.WithOpen(nil)) } testEngineFailInitAndReload(t, false, shardOpts, beforeReload) }) t.Run("metabase", func(t *testing.T) { var openFileMetabaseSucceed atomic.Bool openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) { if openFileMetabaseSucceed.Load() { return os.OpenFile(p, f, mode) } return nil, teststore.ErrDiskExploded } beforeReload := func() { openFileMetabaseSucceed.Store(true) } shardOpts, _, _ := testShard(testShardOpts{ openFileMetabase: openFileMetabase, openFileWriteCache: os.OpenFile, openFilePilorama: os.OpenFile, }) testEngineFailInitAndReload(t, true, shardOpts, beforeReload) }) t.Run("write-cache", func(t *testing.T) { var openFileWriteCacheSucceed atomic.Bool openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) { if openFileWriteCacheSucceed.Load() { return os.OpenFile(p, f, mode) } return nil, teststore.ErrDiskExploded } beforeReload := func() { openFileWriteCacheSucceed.Store(true) } shardOpts, _, _ := testShard(testShardOpts{ openFileMetabase: os.OpenFile, openFileWriteCache: openFileWriteCache, openFilePilorama: os.OpenFile, }) testEngineFailInitAndReload(t, false, shardOpts, beforeReload) }) t.Run("pilorama", func(t *testing.T) { var openFilePiloramaSucceed atomic.Bool openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) { if openFilePiloramaSucceed.Load() { return os.OpenFile(p, f, mode) } return nil, teststore.ErrDiskExploded } beforeReload := func() { openFilePiloramaSucceed.Store(true) } shardOpts, _, _ := testShard(testShardOpts{ openFileMetabase: os.OpenFile, openFileWriteCache: os.OpenFile, openFilePilorama: openFilePilorama, }) testEngineFailInitAndReload(t, false, shardOpts, beforeReload) }) } func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) { var configID string e := New() _, err := e.AddShard(opts...) if errOnAdd { require.Error(t, err) // This branch is only taken when we cannot update shard ID in the metabase. // The id cannot be encountered during normal operation, but it is ok for tests: // it is only compared for equality with other ids and we have 0 shards here. configID = "id" } else { require.NoError(t, err) e.mtx.RLock() var id string for id = range e.shards { break } configID = calculateShardID(e.shards[id].Shard.DumpInfo()) e.mtx.RUnlock() err = e.Open() if err == nil { require.Error(t, e.Init(context.Background())) } } e.mtx.RLock() shardCount := len(e.shards) e.mtx.RUnlock() require.Equal(t, 0, shardCount) beforeReload() require.NoError(t, e.Reload(context.Background(), ReConfiguration{ shards: map[string][]shard.Option{configID: opts}, })) e.mtx.RLock() shardCount = len(e.shards) e.mtx.RUnlock() require.Equal(t, 1, shardCount) } func TestExecBlocks(t *testing.T) { e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many t.Cleanup(func() { os.RemoveAll(t.Name()) }) // put some object obj := testutil.GenerateObjectWithCID(cidtest.ID()) addr := object.AddressOf(obj) require.NoError(t, Put(e, obj)) // block executions errBlock := errors.New("block exec err") require.NoError(t, e.BlockExecution(errBlock)) // try to exec some op _, err := Head(context.Background(), e, addr) require.ErrorIs(t, err, errBlock) // resume executions require.NoError(t, e.ResumeExecution()) _, err = Head(context.Background(), e, addr) // can be any data-related op require.NoError(t, err) // close require.NoError(t, e.Close()) // try exec after close _, err = Head(context.Background(), e, addr) require.Error(t, err) // try to resume require.Error(t, e.ResumeExecution()) } func TestPersistentShardID(t *testing.T) { dir, err := os.MkdirTemp("", "*") require.NoError(t, err) te := newEngineWithErrorThreshold(t, dir, 1) checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite) require.NoError(t, te.ng.Close()) newTe := newEngineWithErrorThreshold(t, dir, 1) for i := 0; i < len(newTe.shards); i++ { require.Equal(t, te.shards[i].id, newTe.shards[i].id) } require.NoError(t, newTe.ng.Close()) p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path tmp := filepath.Join(dir, "tmp") require.NoError(t, os.Rename(p1, tmp)) require.NoError(t, os.Rename(p2, p1)) require.NoError(t, os.Rename(tmp, p2)) newTe = newEngineWithErrorThreshold(t, dir, 1) require.Equal(t, te.shards[1].id, newTe.shards[0].id) require.Equal(t, te.shards[0].id, newTe.shards[1].id) require.NoError(t, newTe.ng.Close()) } func TestReload(t *testing.T) { path := t.TempDir() t.Run("add shards", func(t *testing.T) { const shardNum = 4 addPath := filepath.Join(path, "add") e, currShards := engineWithShards(t, addPath, shardNum) var rcfg ReConfiguration for _, p := range currShards { rcfg.AddShard(p, nil) } rcfg.AddShard(currShards[0], nil) // same path require.NoError(t, e.Reload(context.Background(), rcfg)) // no new paths => no new shards require.Equal(t, shardNum, len(e.shards)) require.Equal(t, shardNum, len(e.shardPools)) newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum)) // add new shard rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions( meta.WithPath(newMeta), meta.WithEpochState(epochState{}), )}) require.NoError(t, e.Reload(context.Background(), rcfg)) require.Equal(t, shardNum+1, len(e.shards)) require.Equal(t, shardNum+1, len(e.shardPools)) }) t.Run("remove shards", func(t *testing.T) { const shardNum = 4 removePath := filepath.Join(path, "remove") e, currShards := engineWithShards(t, removePath, shardNum) var rcfg ReConfiguration for i := 0; i < len(currShards)-1; i++ { // without one of the shards rcfg.AddShard(currShards[i], nil) } require.NoError(t, e.Reload(context.Background(), rcfg)) // removed one require.Equal(t, shardNum-1, len(e.shards)) require.Equal(t, shardNum-1, len(e.shardPools)) }) } // engineWithShards creates engine with specified number of shards. Returns // slice of paths to their metabase and the engine. func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) { addPath := filepath.Join(path, "add") currShards := make([]string, 0, num) te := testNewEngine(t). setShardsNumOpts(t, num, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(&logger.Logger{Logger: zap.L()}), shard.WithBlobStorOptions( blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(id)), errSmallSize))), shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", id))), meta.WithPermissions(0700), meta.WithEpochState(epochState{}), ), } }) e, ids := te.engine, te.shardIDs for _, id := range ids { currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo())) } require.Equal(t, num, len(e.shards)) require.Equal(t, num, len(e.shardPools)) require.NoError(t, e.Open()) require.NoError(t, e.Init(context.Background())) return e, currShards }