package writecache

import (
	"context"
	"os"
	"path/filepath"
	"sync/atomic"
	"testing"

	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
	"go.etcd.io/bbolt"
	"go.uber.org/zap"
)

func TestFlush(t *testing.T) {
	testlogger := test.NewLogger(t)

	createCacheFn := func(t *testing.T, smallSize uint64, mb *meta.DB, bs MainStorage, opts ...Option) Cache {
		return New(
			append([]Option{
				WithLogger(testlogger),
				WithPath(filepath.Join(t.TempDir(), "writecache")),
				WithSmallObjectSize(smallSize),
				WithMetabase(mb),
				WithBlobstor(bs),
				WithDisableBackgroundFlush(),
			}, opts...)...)
	}

	errCountOpt := func() (Option, *atomic.Uint32) {
		cnt := &atomic.Uint32{}
		return WithReportErrorFunc(func(msg string, err error) {
			cnt.Add(1)
			testlogger.Warn(msg, zap.Uint32("error_count", cnt.Load()), zap.Error(err))
		}), cnt
	}

	failures := []TestFailureInjector[Option]{
		{
			Desc: "db, invalid address",
			InjectFn: func(t *testing.T, wc Cache) {
				c := wc.(*cache)
				obj := testutil.GenerateObject()
				data, err := obj.Marshal()
				require.NoError(t, err)
				require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
					b := tx.Bucket(defaultBucket)
					return b.Put([]byte{1, 2, 3}, data)
				}))
			},
		},
		{
			Desc: "db, invalid object",
			InjectFn: func(t *testing.T, wc Cache) {
				c := wc.(*cache)
				require.NoError(t, c.db.Batch(func(tx *bbolt.Tx) error {
					b := tx.Bucket(defaultBucket)
					k := []byte(oidtest.Address().EncodeToString())
					v := []byte{1, 2, 3}
					return b.Put(k, v)
				}))
			},
		},
		{
			Desc: "fs, read error",
			InjectFn: func(t *testing.T, wc Cache) {
				c := wc.(*cache)
				obj := testutil.GenerateObject()
				data, err := obj.Marshal()
				require.NoError(t, err)

				var prm common.PutPrm
				prm.Address = objectCore.AddressOf(obj)
				prm.RawData = data

				_, err = c.fsTree.Put(context.Background(), prm)
				require.NoError(t, err)

				p := prm.Address.Object().EncodeToString() + "." + prm.Address.Container().EncodeToString()
				p = filepath.Join(c.fsTree.RootPath, p[:1], p[1:])

				_, err = os.Stat(p) // sanity check
				require.NoError(t, err)
				require.NoError(t, os.Truncate(p, 0)) // corrupt the file contents, so that it can't be unmarshalled
			},
		},
		{
			Desc: "fs, invalid object",
			InjectFn: func(t *testing.T, wc Cache) {
				c := wc.(*cache)
				var prm common.PutPrm
				prm.Address = oidtest.Address()
				prm.RawData = []byte{1, 2, 3}
				_, err := c.fsTree.Put(context.Background(), prm)
				require.NoError(t, err)
			},
		},
	}

	runFlushTest(t, createCacheFn, errCountOpt, failures...)
}

const (
	objCount  = 4
	smallSize = 256
)

type CreateCacheFunc[Option any] func(
	t *testing.T,
	smallSize uint64,
	meta *meta.DB,
	bs MainStorage,
	opts ...Option,
) Cache

type TestFailureInjector[Option any] struct {
	Desc     string
	InjectFn func(*testing.T, Cache)
}

type objectPair struct {
	addr oid.Address
	obj  *objectSDK.Object
}

func runFlushTest[Option any](
	t *testing.T,
	createCacheFn CreateCacheFunc[Option],
	errCountOption func() (Option, *atomic.Uint32),
	failures ...TestFailureInjector[Option],
) {
	t.Run("no errors", func(t *testing.T) {
		wc, bs, mb := newCache(t, createCacheFn, smallSize)
		defer func() { require.NoError(t, wc.Close()) }()
		objects := putObjects(t, wc)

		require.NoError(t, bs.SetMode(mode.ReadWrite))
		require.NoError(t, mb.SetMode(mode.ReadWrite))

		require.NoError(t, wc.Flush(context.Background(), false, false))

		check(t, mb, bs, objects)
	})

	t.Run("flush on moving to degraded mode", func(t *testing.T) {
		wc, bs, mb := newCache(t, createCacheFn, smallSize)
		defer func() { require.NoError(t, wc.Close()) }()
		objects := putObjects(t, wc)

		// Blobstor is read-only, so we expect en error from `flush` here.
		require.Error(t, wc.SetMode(mode.Degraded))

		require.NoError(t, bs.SetMode(mode.ReadWrite))
		require.NoError(t, mb.SetMode(mode.ReadWrite))
		require.NoError(t, wc.SetMode(mode.Degraded))

		check(t, mb, bs, objects)
	})

	t.Run("ignore errors", func(t *testing.T) {
		for _, f := range failures {
			f := f
			t.Run(f.Desc, func(t *testing.T) {
				errCountOpt, errCount := errCountOption()
				wc, bs, mb := newCache(t, createCacheFn, smallSize, errCountOpt)
				defer func() { require.NoError(t, wc.Close()) }()
				objects := putObjects(t, wc)
				f.InjectFn(t, wc)

				require.NoError(t, bs.SetMode(mode.ReadWrite))
				require.NoError(t, mb.SetMode(mode.ReadWrite))

				require.Equal(t, uint32(0), errCount.Load())
				require.Error(t, wc.Flush(context.Background(), false, false))
				require.Greater(t, errCount.Load(), uint32(0))
				require.NoError(t, wc.Flush(context.Background(), true, false))

				check(t, mb, bs, objects)
			})
		}
	})
}

func newCache[Option any](
	t *testing.T,
	createCacheFn CreateCacheFunc[Option],
	smallSize uint64,
	opts ...Option,
) (Cache, *blobstor.BlobStor, *meta.DB) {
	dir := t.TempDir()
	mb := meta.New(
		meta.WithPath(filepath.Join(dir, "meta")),
		meta.WithEpochState(dummyEpoch{}))
	require.NoError(t, mb.Open(context.Background(), mode.ReadWrite))
	require.NoError(t, mb.Init())

	bs := blobstor.New(blobstor.WithStorages([]blobstor.SubStorage{
		{
			Storage: fstree.New(
				fstree.WithPath(filepath.Join(dir, "blob")),
				fstree.WithDepth(0),
				fstree.WithDirNameLen(1)),
		},
	}))
	require.NoError(t, bs.Open(context.Background(), mode.ReadWrite))
	require.NoError(t, bs.Init())

	wc := createCacheFn(t, smallSize, mb, bs, opts...)
	require.NoError(t, wc.Open(context.Background(), mode.ReadWrite))
	require.NoError(t, wc.Init())

	// First set mode for metabase and blobstor to prevent background flushes.
	require.NoError(t, mb.SetMode(mode.ReadOnly))
	require.NoError(t, bs.SetMode(mode.ReadOnly))

	return wc, bs, mb
}

func putObject(t *testing.T, c Cache, size int) objectPair {
	obj := testutil.GenerateObjectWithSize(size)
	data, err := obj.Marshal()
	require.NoError(t, err)

	var prm common.PutPrm
	prm.Address = objectCore.AddressOf(obj)
	prm.Object = obj
	prm.RawData = data

	_, err = c.Put(context.Background(), prm)
	require.NoError(t, err)

	return objectPair{prm.Address, prm.Object}
}

func putObjects(t *testing.T, c Cache) []objectPair {
	objects := make([]objectPair, objCount)
	for i := range objects {
		objects[i] = putObject(t, c, 1+(i%2)*smallSize)
	}
	return objects
}

func check(t *testing.T, mb *meta.DB, bs *blobstor.BlobStor, objects []objectPair) {
	for i := range objects {
		var mPrm meta.StorageIDPrm
		mPrm.SetAddress(objects[i].addr)

		mRes, err := mb.StorageID(context.Background(), mPrm)
		require.NoError(t, err)

		var prm common.GetPrm
		prm.Address = objects[i].addr
		prm.StorageID = mRes.StorageID()

		res, err := bs.Get(context.Background(), prm)
		require.NoError(t, err)
		require.Equal(t, objects[i].obj, res.Object)
	}
}

type dummyEpoch struct{}

func (dummyEpoch) CurrentEpoch() uint64 {
	return 0
}