package shard

import (
	"context"
	"os"
	"path/filepath"
	"testing"

	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
	checksumtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum/test"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	sessiontest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session/test"
	usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
	"github.com/stretchr/testify/require"
)

func TestShardReload(t *testing.T) {
	t.Parallel()

	p := t.Name()
	defer os.RemoveAll(p)
	l := test.NewLogger(t)
	blobOpts := []blobstor.Option{
		blobstor.WithLogger(l),
		blobstor.WithStorages([]blobstor.SubStorage{
			{
				Storage: fstree.New(
					fstree.WithPath(filepath.Join(p, "blob")),
					fstree.WithDepth(1)),
			},
		}),
	}

	metaOpts := []meta.Option{
		meta.WithPath(filepath.Join(p, "meta")),
		meta.WithEpochState(epochState{}),
	}

	opts := []Option{
		WithID(NewIDFromBytes([]byte{})),
		WithLogger(l),
		WithBlobStorOptions(blobOpts...),
		WithMetaBaseOptions(metaOpts...),
		WithPiloramaOptions(
			pilorama.WithPath(filepath.Join(p, "pilorama"))),
	}

	sh := New(opts...)
	require.NoError(t, sh.Open(context.Background()))
	require.NoError(t, sh.Init(context.Background()))

	defer func() {
		require.NoError(t, sh.Close())
	}()

	objects := make([]objAddr, 5)
	for i := range objects {
		objects[i].obj = newObject()
		objects[i].addr = objectCore.AddressOf(objects[i].obj)
		require.NoError(t, putObject(sh, objects[i].obj))
	}

	checkHasObjects := func(t *testing.T, exists bool) {
		for i := range objects {
			var prm ExistsPrm
			prm.SetAddress(objects[i].addr)

			res, err := sh.Exists(context.Background(), prm)
			require.NoError(t, err)
			require.Equal(t, exists, res.Exists(), "object #%d is missing", i)
		}
	}

	checkHasObjects(t, true)

	t.Run("same config, no-op", func(t *testing.T) {
		require.NoError(t, sh.Reload(context.Background(), opts...))
		checkHasObjects(t, true)
	})

	t.Run("open meta at new path", func(t *testing.T) {
		newShardOpts := func(metaPath string, resync bool) []Option {
			metaOpts := []meta.Option{meta.WithPath(metaPath), meta.WithEpochState(epochState{})}
			return append(opts, WithMetaBaseOptions(metaOpts...), WithRefillMetabase(resync))
		}

		newOpts := newShardOpts(filepath.Join(p, "meta1"), false)
		require.NoError(t, sh.Reload(context.Background(), newOpts...))

		checkHasObjects(t, false) // new path, but no resync

		t.Run("can put objects", func(t *testing.T) {
			obj := newObject()
			require.NoError(t, putObject(sh, obj))
			objects = append(objects, objAddr{obj: obj, addr: objectCore.AddressOf(obj)})
		})

		newOpts = newShardOpts(filepath.Join(p, "meta2"), true)
		require.NoError(t, sh.Reload(context.Background(), newOpts...))

		checkHasObjects(t, true) // all objects are restored, including the new one

		t.Run("reload failed", func(t *testing.T) {
			badPath := filepath.Join(p, "meta3")
			require.NoError(t, os.WriteFile(badPath, []byte{1}, 0))

			newOpts = newShardOpts(badPath, true)
			require.Error(t, sh.Reload(context.Background(), newOpts...))

			// Cleanup is done, no panic.
			obj := newObject()
			require.ErrorIs(t, putObject(sh, obj), ErrReadOnlyMode)

			// Old objects are still accessible.
			checkHasObjects(t, true)

			// Successive reload produces no undesired effects.
			require.NoError(t, os.RemoveAll(badPath))
			require.NoError(t, sh.Reload(context.Background(), newOpts...))

			obj = newObject()
			require.NoError(t, putObject(sh, obj))

			objects = append(objects, objAddr{obj: obj, addr: objectCore.AddressOf(obj)})
			checkHasObjects(t, true)
		})
	})
}

func putObject(sh *Shard, obj *objectSDK.Object) error {
	var prm PutPrm
	prm.SetObject(obj)

	_, err := sh.Put(context.Background(), prm)
	return err
}

func newObject() *objectSDK.Object {
	x := objectSDK.New()
	ver := version.Current()

	x.SetID(oidtest.ID())
	x.SetSessionToken(sessiontest.Object())
	x.SetPayload([]byte{1, 2, 3})
	x.SetPayloadSize(3)
	x.SetOwnerID(usertest.ID())
	x.SetContainerID(cidtest.ID())
	x.SetType(objectSDK.TypeRegular)
	x.SetVersion(&ver)
	x.SetPayloadChecksum(checksumtest.Checksum())
	x.SetPayloadHomomorphicHash(checksumtest.Checksum())
	return x
}