frostfs-node/pkg/local_object_storage/engine/control_test.go

320 lines
9.4 KiB
Go
Raw Permalink Normal View History

package engine
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
// TestInitializationFailure checks that shard is initialized and closed even if media
// under any single component is absent.
func TestInitializationFailure(t *testing.T) {
type openFileFunc func(string, int, fs.FileMode) (*os.File, error)
type testShardOpts struct {
openFileMetabase openFileFunc
openFilePilorama openFileFunc
}
testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
sid, err := generateShardID()
require.NoError(t, err)
storages, smallFileStorage, largeFileStorage := newTestStorages(t.TempDir(), 1<<20)
wcOpts := []writecache.Option{
writecache.WithPath(t.TempDir()),
}
return []shard.Option{
shard.WithID(sid),
shard.WithLogger(test.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithStorages(storages)),
shard.WithMetaBaseOptions(
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
OpenFile: opts.openFileMetabase,
}),
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
meta.WithPermissions(0o700),
meta.WithEpochState(epochState{})),
shard.WithWriteCache(true),
shard.WithWriteCacheOptions(wcOpts),
shard.WithPiloramaOptions(
pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama")),
pilorama.WithOpenFile(opts.openFilePilorama),
),
}, smallFileStorage, largeFileStorage
}
t.Run("blobstor", func(t *testing.T) {
shardOpts, _, largeFileStorage := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFilePilorama: os.OpenFile,
})
largeFileStorage.SetOption(teststore.WithOpen(func(primitiveMode mode.ComponentMode) error {
return teststore.ErrDiskExploded
}))
beforeReload := func() {
largeFileStorage.SetOption(teststore.WithOpen(nil))
}
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
})
t.Run("metabase", func(t *testing.T) {
var openFileMetabaseSucceed atomic.Bool
openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
if openFileMetabaseSucceed.Load() {
return os.OpenFile(p, f, mode)
}
return nil, teststore.ErrDiskExploded
}
beforeReload := func() {
openFileMetabaseSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: openFileMetabase,
openFilePilorama: os.OpenFile,
})
testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
})
t.Run("pilorama", func(t *testing.T) {
var openFilePiloramaSucceed atomic.Bool
openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
if openFilePiloramaSucceed.Load() {
return os.OpenFile(p, f, mode)
}
return nil, teststore.ErrDiskExploded
}
beforeReload := func() {
openFilePiloramaSucceed.Store(true)
}
shardOpts, _, _ := testShard(testShardOpts{
openFileMetabase: os.OpenFile,
openFilePilorama: openFilePilorama,
})
testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
})
}
func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.Option, beforeReload func()) {
var configID string
e := New()
defer func() {
require.NoError(t, e.Close(context.Background()))
}()
_, err := e.AddShard(context.Background(), opts...)
require.NoError(t, err)
e.mtx.RLock()
var id string
for id = range e.shards {
break
}
configID = calculateShardID(e.shards[id].Shard.DumpInfo())
e.mtx.RUnlock()
err = e.Open(context.Background())
require.NoError(t, err)
if degradedMode {
require.NoError(t, e.Init(context.Background()))
require.Equal(t, mode.DegradedReadOnly, e.DumpInfo().Shards[0].Mode)
return
} else {
require.Error(t, e.Init(context.Background()))
e.mtx.RLock()
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 0, shardCount)
}
beforeReload()
require.NoError(t, e.Reload(context.Background(), ReConfiguration{
shards: map[string][]shard.Option{configID: opts},
}))
e.mtx.RLock()
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 1, shardCount)
}
func TestExecBlocks(t *testing.T) {
e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many
// put some object
obj := testutil.GenerateObjectWithCID(cidtest.ID())
addr := object.AddressOf(obj)
require.NoError(t, Put(context.Background(), e, obj))
// block executions
errBlock := errors.New("block exec err")
require.NoError(t, e.BlockExecution(errBlock))
// try to exec some op
_, err := Head(context.Background(), e, addr)
require.ErrorIs(t, err, errBlock)
// resume executions
require.NoError(t, e.ResumeExecution())
_, err = Head(context.Background(), e, addr) // can be any data-related op
require.NoError(t, err)
// close
require.NoError(t, e.Close(context.Background()))
// try exec after close
_, err = Head(context.Background(), e, addr)
require.Error(t, err)
// try to resume
require.Error(t, e.ResumeExecution())
}
func TestPersistentShardID(t *testing.T) {
dir := t.TempDir()
te := newEngineWithErrorThreshold(t, dir, 1)
checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
require.NoError(t, te.ng.Close(context.Background()))
newTe := newEngineWithErrorThreshold(t, dir, 1)
for i := 0; i < len(newTe.shards); i++ {
require.Equal(t, te.shards[i].id, newTe.shards[i].id)
}
require.NoError(t, newTe.ng.Close(context.Background()))
p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
newTe = newEngineWithErrorThreshold(t, dir, 1)
require.Equal(t, te.shards[1].id, newTe.shards[0].id)
require.Equal(t, te.shards[0].id, newTe.shards[1].id)
require.NoError(t, newTe.ng.Close(context.Background()))
}
func TestReload(t *testing.T) {
path := t.TempDir()
t.Run("add shards", func(t *testing.T) {
const shardNum = 4
addPath := filepath.Join(path, "add")
e, currShards := engineWithShards(t, addPath, shardNum)
var rcfg ReConfiguration
for _, p := range currShards {
rcfg.AddShard(p, nil)
}
rcfg.AddShard(currShards[0], nil) // same path
require.NoError(t, e.Reload(context.Background(), rcfg))
// no new paths => no new shards
require.Equal(t, shardNum, len(e.shards))
require.Equal(t, shardNum, len(e.shardPools))
newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))
// add new shard
rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions(
meta.WithPath(newMeta),
meta.WithEpochState(epochState{}),
)})
require.NoError(t, e.Reload(context.Background(), rcfg))
require.Equal(t, shardNum+1, len(e.shards))
require.Equal(t, shardNum+1, len(e.shardPools))
require.NoError(t, e.Close(context.Background()))
})
t.Run("remove shards", func(t *testing.T) {
const shardNum = 4
removePath := filepath.Join(path, "remove")
e, currShards := engineWithShards(t, removePath, shardNum)
var rcfg ReConfiguration
for i := 0; i < len(currShards)-1; i++ { // without one of the shards
rcfg.AddShard(currShards[i], nil)
}
require.NoError(t, e.Reload(context.Background(), rcfg))
// removed one
require.Equal(t, shardNum-1, len(e.shards))
require.Equal(t, shardNum-1, len(e.shardPools))
require.NoError(t, e.Close(context.Background()))
})
}
// engineWithShards creates engine with specified number of shards. Returns
// slice of paths to their metabase and the engine.
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
addPath := filepath.Join(path, "add")
currShards := make([]string, 0, num)
te := testNewEngine(t).
setShardsNumOpts(t, num, func(id int) []shard.Option {
return []shard.Option{
shard.WithLogger(test.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithStorages(newStorages(t, filepath.Join(addPath, strconv.Itoa(id)), errSmallSize))),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", id))),
meta.WithPermissions(0o700),
meta.WithEpochState(epochState{}),
),
}
})
e, ids := te.engine, te.shardIDs
for _, id := range ids {
currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo()))
}
require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
return e, currShards
}