package engine

import (
	"context"
	"fmt"
	"strconv"
	"testing"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	objecttest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/test"
	"github.com/stretchr/testify/require"
	"golang.org/x/sync/errgroup"
)

func TestStorageEngine_Inhume(t *testing.T) {
	cnr := cidtest.ID()
	splitID := objectSDK.NewSplitID()

	fs := objectSDK.SearchFilters{}
	fs.AddRootFilter()

	tombstoneID := object.AddressOf(testutil.GenerateObjectWithCID(cnr))
	parent := testutil.GenerateObjectWithCID(cnr)

	child := testutil.GenerateObjectWithCID(cnr)
	child.SetParent(parent)
	idParent, _ := parent.ID()
	child.SetParentID(idParent)
	child.SetSplitID(splitID)

	link := testutil.GenerateObjectWithCID(cnr)
	link.SetParent(parent)
	link.SetParentID(idParent)
	idChild, _ := child.ID()
	link.SetChildren(idChild)
	link.SetSplitID(splitID)

	t.Run("delete small object", func(t *testing.T) {
		t.Parallel()
		e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
		defer func() { require.NoError(t, e.Close(context.Background())) }()

		err := Put(context.Background(), e, parent, false)
		require.NoError(t, err)

		var inhumePrm InhumePrm
		inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))

		err = e.Inhume(context.Background(), inhumePrm)
		require.NoError(t, err)

		addrs, err := Select(context.Background(), e, cnr, false, fs)
		require.NoError(t, err)
		require.Empty(t, addrs)
	})

	t.Run("delete big object", func(t *testing.T) {
		t.Parallel()

		te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
		e := te.engine
		defer func() { require.NoError(t, e.Close(context.Background())) }()

		s1, s2 := te.shards[0], te.shards[1]

		var putChild shard.PutPrm
		putChild.SetObject(child)
		_, err := s1.Put(context.Background(), putChild)
		require.NoError(t, err)

		var putLink shard.PutPrm
		putLink.SetObject(link)
		_, err = s2.Put(context.Background(), putLink)
		require.NoError(t, err)

		var inhumePrm InhumePrm
		inhumePrm.WithTarget(tombstoneID, object.AddressOf(parent))

		err = e.Inhume(context.Background(), inhumePrm)
		require.NoError(t, err)

		addrs, err := Select(context.Background(), e, cnr, false, fs)
		require.NoError(t, err)
		require.Empty(t, addrs)
	})
}

func TestStorageEngine_ECInhume(t *testing.T) {
	parentObjectAddress := oidtest.Address()
	containerID := parentObjectAddress.Container()

	chunkObject0 := testutil.GenerateObjectWithCID(containerID)
	chunkObject0.SetECHeader(objectSDK.NewECHeader(
		objectSDK.ECParentInfo{
			ID: parentObjectAddress.Object(),
		}, 0, 4, []byte{}, 0))

	chunkObject1 := testutil.GenerateObjectWithCID(containerID)
	chunkObject1.SetECHeader(objectSDK.NewECHeader(
		objectSDK.ECParentInfo{
			ID: parentObjectAddress.Object(),
		}, 1, 4, []byte{}, 0))

	tombstone := objectSDK.NewTombstone()
	tombstone.SetMembers([]oid.ID{parentObjectAddress.Object()})
	payload, err := tombstone.Marshal()
	require.NoError(t, err)
	tombstoneObject := testutil.GenerateObjectWithCID(containerID)
	tombstoneObject.SetType(objectSDK.TypeTombstone)
	tombstoneObject.SetPayload(payload)
	tombstoneObjectAddress := object.AddressOf(tombstoneObject)

	e := testNewEngine(t).setShardsNum(t, 5).prepare(t).engine
	defer func() { require.NoError(t, e.Close(context.Background())) }()

	require.NoError(t, Put(context.Background(), e, chunkObject0, false))

	require.NoError(t, Put(context.Background(), e, tombstoneObject, false))

	var inhumePrm InhumePrm
	inhumePrm.WithTarget(tombstoneObjectAddress, parentObjectAddress)
	err = e.Inhume(context.Background(), inhumePrm)
	require.NoError(t, err)

	var alreadyRemoved *apistatus.ObjectAlreadyRemoved

	require.ErrorAs(t, Put(context.Background(), e, chunkObject0, false), &alreadyRemoved)

	require.ErrorAs(t, Put(context.Background(), e, chunkObject1, false), &alreadyRemoved)
}

func TestInhumeExpiredRegularObject(t *testing.T) {
	t.Parallel()

	const currEpoch = 42
	const objectExpiresAfter = currEpoch - 1

	engine := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
		return []shard.Option{
			shard.WithDisabledGC(),
			shard.WithMetaBaseOptions(append(
				testGetDefaultMetabaseOptions(t),
				meta.WithEpochState(epochState{currEpoch}),
			)...),
		}
	}).prepare(t).engine

	cnr := cidtest.ID()

	generateAndPutObject := func() *objectSDK.Object {
		obj := testutil.GenerateObjectWithCID(cnr)
		testutil.AddAttribute(obj, objectV2.SysAttributeExpEpoch, strconv.Itoa(objectExpiresAfter))

		var putPrm PutPrm
		putPrm.Object = obj
		require.NoError(t, engine.Put(context.Background(), putPrm))
		return obj
	}

	t.Run("inhume with tombstone", func(t *testing.T) {
		obj := generateAndPutObject()
		ts := oidtest.Address()
		ts.SetContainer(cnr)

		var prm InhumePrm
		prm.WithTarget(ts, object.AddressOf(obj))
		err := engine.Inhume(context.Background(), prm)
		require.NoError(t, err)
	})

	t.Run("inhume without tombstone", func(t *testing.T) {
		obj := generateAndPutObject()

		var prm InhumePrm
		prm.MarkAsGarbage(object.AddressOf(obj))
		err := engine.Inhume(context.Background(), prm)
		require.NoError(t, err)
	})
}

func BenchmarkInhumeMultipart(b *testing.B) {
	// The benchmark result insignificantly depends on the number of shards,
	// so do not use it as a benchmark parameter, just set it big enough.
	numShards := 100

	for numObjects := 1; numObjects <= 10000; numObjects *= 10 {
		b.Run(
			fmt.Sprintf("objects=%d", numObjects),
			func(b *testing.B) {
				benchmarkInhumeMultipart(b, numShards, numObjects)
			},
		)
	}
}

func benchmarkInhumeMultipart(b *testing.B, numShards, numObjects int) {
	b.StopTimer()

	engine := testNewEngine(b, WithShardPoolSize(uint32(numObjects))).
		setShardsNum(b, numShards).prepare(b).engine
	defer func() { require.NoError(b, engine.Close(context.Background())) }()

	cnt := cidtest.ID()
	eg := errgroup.Group{}

	for range b.N {
		addrs := make([]oid.Address, numObjects)

		for i := range numObjects {
			prm := PutPrm{}

			prm.Object = objecttest.Object().Parent()
			prm.Object.SetContainerID(cnt)
			prm.Object.SetType(objectSDK.TypeRegular)

			addrs[i] = object.AddressOf(prm.Object)

			eg.Go(func() error {
				return engine.Put(context.Background(), prm)
			})
		}
		require.NoError(b, eg.Wait())

		ts := oidtest.Address()
		ts.SetContainer(cnt)

		prm := InhumePrm{}
		prm.WithTarget(ts, addrs...)

		b.StartTimer()
		err := engine.Inhume(context.Background(), prm)
		require.NoError(b, err)
		b.StopTimer()
	}
}