package engine

import (
	"context"
	"errors"
	"fmt"
	"io/fs"
	"os"
	"path/filepath"
	"strconv"
	"sync/atomic"
	"testing"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	"github.com/stretchr/testify/require"
	"go.etcd.io/bbolt"
	"go.uber.org/zap"
	"go.uber.org/zap/zaptest"
)

// TestInitializationFailure checks that shard is initialized and closed even if media
// under any single component is absent.
func TestInitializationFailure(t *testing.T) {
	type openFileFunc func(string, int, fs.FileMode) (*os.File, error)

	type testShardOpts struct {
		openFileMetabase   openFileFunc
		openFileWriteCache openFileFunc
		openFilePilorama   openFileFunc
	}

	testShard := func(opts testShardOpts) ([]shard.Option, *teststore.TestStore, *teststore.TestStore) {
		sid, err := generateShardID()
		require.NoError(t, err)

		tempDir := t.TempDir()
		blobstorPath := filepath.Join(tempDir, "bs")
		metabasePath := filepath.Join(tempDir, "mb")
		writecachePath := filepath.Join(tempDir, "wc")
		piloramaPath := filepath.Join(tempDir, "pl")

		storages, smallFileStorage, largeFileStorage := newTestStorages(blobstorPath, 1<<20)

		return []shard.Option{
			shard.WithID(sid),
			shard.WithLogger(&logger.Logger{Logger: zaptest.NewLogger(t)}),
			shard.WithBlobStorOptions(
				blobstor.WithStorages(storages)),
			shard.WithMetaBaseOptions(
				meta.WithBoltDBOptions(&bbolt.Options{
					Timeout:  100 * time.Millisecond,
					OpenFile: opts.openFileMetabase,
				}),
				meta.WithPath(metabasePath),
				meta.WithPermissions(0700),
				meta.WithEpochState(epochState{})),
			shard.WithWriteCache(true),
			shard.WithWriteCacheOptions(
				writecache.WithPath(writecachePath),
				writecache.WithOpenFile(opts.openFileWriteCache),
			),
			shard.WithPiloramaOptions(
				pilorama.WithPath(piloramaPath),
				pilorama.WithOpenFile(opts.openFilePilorama),
			),
		}, smallFileStorage, largeFileStorage
	}

	t.Run("blobstor", func(t *testing.T) {
		shardOpts, _, largeFileStorage := testShard(testShardOpts{
			openFileMetabase:   os.OpenFile,
			openFileWriteCache: os.OpenFile,
			openFilePilorama:   os.OpenFile,
		})
		largeFileStorage.SetOption(teststore.WithOpen(func(ro bool) error {
			return teststore.ErrDiskExploded
		}))
		beforeReload := func() {
			largeFileStorage.SetOption(teststore.WithOpen(nil))
		}
		testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
	})
	t.Run("metabase", func(t *testing.T) {
		var openFileMetabaseSucceed atomic.Bool
		openFileMetabase := func(p string, f int, mode fs.FileMode) (*os.File, error) {
			if openFileMetabaseSucceed.Load() {
				return os.OpenFile(p, f, mode)
			}
			return nil, teststore.ErrDiskExploded
		}
		beforeReload := func() {
			openFileMetabaseSucceed.Store(true)
		}
		shardOpts, _, _ := testShard(testShardOpts{
			openFileMetabase:   openFileMetabase,
			openFileWriteCache: os.OpenFile,
			openFilePilorama:   os.OpenFile,
		})
		testEngineFailInitAndReload(t, true, shardOpts, beforeReload)
	})
	t.Run("write-cache", func(t *testing.T) {
		var openFileWriteCacheSucceed atomic.Bool
		openFileWriteCache := func(p string, f int, mode fs.FileMode) (*os.File, error) {
			if openFileWriteCacheSucceed.Load() {
				return os.OpenFile(p, f, mode)
			}
			return nil, teststore.ErrDiskExploded
		}
		beforeReload := func() {
			openFileWriteCacheSucceed.Store(true)
		}
		shardOpts, _, _ := testShard(testShardOpts{
			openFileMetabase:   os.OpenFile,
			openFileWriteCache: openFileWriteCache,
			openFilePilorama:   os.OpenFile,
		})
		testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
	})
	t.Run("pilorama", func(t *testing.T) {
		var openFilePiloramaSucceed atomic.Bool
		openFilePilorama := func(p string, f int, mode fs.FileMode) (*os.File, error) {
			if openFilePiloramaSucceed.Load() {
				return os.OpenFile(p, f, mode)
			}
			return nil, teststore.ErrDiskExploded
		}
		beforeReload := func() {
			openFilePiloramaSucceed.Store(true)
		}
		shardOpts, _, _ := testShard(testShardOpts{
			openFileMetabase:   os.OpenFile,
			openFileWriteCache: os.OpenFile,
			openFilePilorama:   openFilePilorama,
		})
		testEngineFailInitAndReload(t, false, shardOpts, beforeReload)
	})
}

func testEngineFailInitAndReload(t *testing.T, errOnAdd bool, opts []shard.Option, beforeReload func()) {
	var configID string

	e := New()
	_, err := e.AddShard(opts...)
	if errOnAdd {
		require.Error(t, err)
		// This branch is only taken when we cannot update shard ID in the metabase.
		// The id cannot be encountered during normal operation, but it is ok for tests:
		// it is only compared for equality with other ids and we have 0 shards here.
		configID = "id"
	} else {
		require.NoError(t, err)

		e.mtx.RLock()
		var id string
		for id = range e.shards {
			break
		}
		configID = calculateShardID(e.shards[id].Shard.DumpInfo())
		e.mtx.RUnlock()

		err = e.Open()
		if err == nil {
			require.Error(t, e.Init(context.Background()))
		}
	}

	e.mtx.RLock()
	shardCount := len(e.shards)
	e.mtx.RUnlock()
	require.Equal(t, 0, shardCount)

	beforeReload()

	require.NoError(t, e.Reload(context.Background(), ReConfiguration{
		shards: map[string][]shard.Option{configID: opts},
	}))

	e.mtx.RLock()
	shardCount = len(e.shards)
	e.mtx.RUnlock()
	require.Equal(t, 1, shardCount)
}

func TestExecBlocks(t *testing.T) {
	e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many

	t.Cleanup(func() {
		os.RemoveAll(t.Name())
	})

	// put some object
	obj := testutil.GenerateObjectWithCID(cidtest.ID())

	addr := object.AddressOf(obj)

	require.NoError(t, Put(context.Background(), e, obj))

	// block executions
	errBlock := errors.New("block exec err")

	require.NoError(t, e.BlockExecution(errBlock))

	// try to exec some op
	_, err := Head(context.Background(), e, addr)
	require.ErrorIs(t, err, errBlock)

	// resume executions
	require.NoError(t, e.ResumeExecution())

	_, err = Head(context.Background(), e, addr) // can be any data-related op
	require.NoError(t, err)

	// close
	require.NoError(t, e.Close())

	// try exec after close
	_, err = Head(context.Background(), e, addr)
	require.Error(t, err)

	// try to resume
	require.Error(t, e.ResumeExecution())
}

func TestPersistentShardID(t *testing.T) {
	dir := t.TempDir()

	te := newEngineWithErrorThreshold(t, dir, 1)

	checkShardState(t, te.ng, te.shards[0].id, 0, mode.ReadWrite)
	require.NoError(t, te.ng.Close())

	newTe := newEngineWithErrorThreshold(t, dir, 1)
	for i := 0; i < len(newTe.shards); i++ {
		require.Equal(t, te.shards[i].id, newTe.shards[i].id)
	}
	require.NoError(t, newTe.ng.Close())

	p1 := newTe.ng.shards[te.shards[0].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
	p2 := newTe.ng.shards[te.shards[1].id.String()].Shard.DumpInfo().MetaBaseInfo.Path
	tmp := filepath.Join(dir, "tmp")
	require.NoError(t, os.Rename(p1, tmp))
	require.NoError(t, os.Rename(p2, p1))
	require.NoError(t, os.Rename(tmp, p2))

	newTe = newEngineWithErrorThreshold(t, dir, 1)
	require.Equal(t, te.shards[1].id, newTe.shards[0].id)
	require.Equal(t, te.shards[0].id, newTe.shards[1].id)
	require.NoError(t, newTe.ng.Close())

}

func TestReload(t *testing.T) {
	path := t.TempDir()

	t.Run("add shards", func(t *testing.T) {
		const shardNum = 4
		addPath := filepath.Join(path, "add")

		e, currShards := engineWithShards(t, addPath, shardNum)

		var rcfg ReConfiguration
		for _, p := range currShards {
			rcfg.AddShard(p, nil)
		}

		rcfg.AddShard(currShards[0], nil) // same path
		require.NoError(t, e.Reload(context.Background(), rcfg))

		// no new paths => no new shards
		require.Equal(t, shardNum, len(e.shards))
		require.Equal(t, shardNum, len(e.shardPools))

		newMeta := filepath.Join(addPath, fmt.Sprintf("%d.metabase", shardNum))

		// add new shard
		rcfg.AddShard(newMeta, []shard.Option{shard.WithMetaBaseOptions(
			meta.WithPath(newMeta),
			meta.WithEpochState(epochState{}),
		)})
		require.NoError(t, e.Reload(context.Background(), rcfg))

		require.Equal(t, shardNum+1, len(e.shards))
		require.Equal(t, shardNum+1, len(e.shardPools))
	})

	t.Run("remove shards", func(t *testing.T) {
		const shardNum = 4
		removePath := filepath.Join(path, "remove")

		e, currShards := engineWithShards(t, removePath, shardNum)

		var rcfg ReConfiguration
		for i := 0; i < len(currShards)-1; i++ { // without one of the shards
			rcfg.AddShard(currShards[i], nil)
		}

		require.NoError(t, e.Reload(context.Background(), rcfg))

		// removed one
		require.Equal(t, shardNum-1, len(e.shards))
		require.Equal(t, shardNum-1, len(e.shardPools))
	})
}

// engineWithShards creates engine with specified number of shards. Returns
// slice of paths to their metabase and the engine.
func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []string) {
	addPath := filepath.Join(path, "add")

	currShards := make([]string, 0, num)

	te := testNewEngine(t).
		setShardsNumOpts(t, num, func(id int) []shard.Option {
			return []shard.Option{
				shard.WithLogger(&logger.Logger{Logger: zap.L()}),
				shard.WithBlobStorOptions(
					blobstor.WithStorages(newStorages(filepath.Join(addPath, strconv.Itoa(id)), errSmallSize))),
				shard.WithMetaBaseOptions(
					meta.WithPath(filepath.Join(addPath, fmt.Sprintf("%d.metabase", id))),
					meta.WithPermissions(0700),
					meta.WithEpochState(epochState{}),
				),
			}
		})
	e, ids := te.engine, te.shardIDs

	for _, id := range ids {
		currShards = append(currShards, calculateShardID(e.shards[id.String()].DumpInfo()))
	}

	require.Equal(t, num, len(e.shards))
	require.Equal(t, num, len(e.shardPools))

	require.NoError(t, e.Open())
	require.NoError(t, e.Init(context.Background()))

	return e, currShards
}