package shard

import (
	"context"
	"errors"
	"testing"

	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
)

func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	sh := newCustomShard(t, false, shardOptions{
		metaOptions: []meta.Option{meta.WithEpochState(epoch)},
		additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
			return util.NewPseudoWorkerPool() // synchronous event processing
		})},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	cnr := cidtest.ID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	obj := testutil.GenerateObjectWithCID(cnr)
	obj.SetAttributes(objExpirationAttr)
	objID, _ := obj.ID()

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm PutPrm
	putPrm.SetObject(obj)

	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	epoch.Value = 105
	sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))

	var getPrm GetPrm
	getPrm.SetAddress(objectCore.AddressOf(obj))
	_, err = sh.Get(context.Background(), getPrm)
	require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired object must be deleted")
}

func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	cnr := cidtest.ID()
	parentID := oidtest.ID()
	splitID := objectSDK.NewSplitID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	parent := testutil.GenerateObjectWithCID(cnr)
	parent.SetID(parentID)
	parent.SetPayload(nil)
	parent.SetAttributes(objExpirationAttr)

	const childCount = 10
	children := make([]*objectSDK.Object, childCount)
	childIDs := make([]oid.ID, childCount)
	for i := range children {
		children[i] = testutil.GenerateObjectWithCID(cnr)
		if i != 0 {
			children[i].SetPreviousID(childIDs[i-1])
		}
		if i == len(children)-1 {
			children[i].SetParent(parent)
		}
		children[i].SetSplitID(splitID)
		children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
		childIDs[i], _ = children[i].ID()
	}

	link := testutil.GenerateObjectWithCID(cnr)
	link.SetParent(parent)
	link.SetParentID(parentID)
	link.SetSplitID(splitID)
	link.SetChildren(childIDs...)

	linkID, _ := link.ID()

	sh := newCustomShard(t, false, shardOptions{
		metaOptions: []meta.Option{meta.WithEpochState(epoch)},
		additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
			return util.NewPseudoWorkerPool() // synchronous event processing
		})},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm PutPrm

	for _, child := range children {
		putPrm.SetObject(child)
		_, err := sh.Put(context.Background(), putPrm)
		require.NoError(t, err)
	}

	putPrm.SetObject(link)
	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	var getPrm GetPrm
	getPrm.SetAddress(objectCore.AddressOf(parent))

	_, err = sh.Get(context.Background(), getPrm)
	var splitInfoError *objectSDK.SplitInfoError
	require.True(t, errors.As(err, &splitInfoError), "split info must be provided")

	epoch.Value = 105
	sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))

	_, err = sh.Get(context.Background(), getPrm)
	require.True(t, client.IsErrObjectNotFound(err) || IsErrObjectExpired(err), "expired complex object must be deleted on epoch after lock expires")
}

func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
	t.Parallel()

	t.Run("flush write-cache before inhume", func(t *testing.T) {
		t.Parallel()
		testGCDropsObjectInhumedFromWritecache(t, true)
	})

	t.Run("don't flush write-cache before inhume", func(t *testing.T) {
		t.Parallel()
		testGCDropsObjectInhumedFromWritecache(t, false)
	})
}

func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) {
	sh := newCustomShard(t, true, shardOptions{
		additionalShardOptions: []Option{WithDisabledGC()},
		wcOpts:                 []writecache.Option{writecache.WithDisableBackgroundFlush()},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	obj := testutil.GenerateObjectWithSize(1024)

	var putPrm PutPrm
	putPrm.SetObject(obj)
	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	// writecache stores object
	wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
	require.NoError(t, err)
	require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))

	// blobstore doesn't store object
	bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{
		Address: objectCore.AddressOf(obj),
	})
	require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
	require.Nil(t, bsRes.Object)
	require.Nil(t, bsRes.RawData)

	if flushbeforeInhume {
		sh.writeCache.Flush(context.Background(), false, false)
	}

	var inhumePrm InhumePrm
	inhumePrm.MarkAsGarbage(objectCore.AddressOf(obj))
	_, err = sh.Inhume(context.Background(), inhumePrm)
	require.NoError(t, err)

	// writecache doesn't store object
	wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
	require.Error(t, err)
	require.Nil(t, wcObj)

	if flushbeforeInhume {
		// blobstore store object
		bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{
			Address: objectCore.AddressOf(obj),
		})
		require.NoError(t, err)
		require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(bsRes.Object))
	} else {

		// blobstore doesn't store object
		bsRes, err = sh.blobStor.Get(context.Background(), common.GetPrm{
			Address: objectCore.AddressOf(obj),
		})
		require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
		require.Nil(t, bsRes.Object)
		require.Nil(t, bsRes.RawData)
	}

	gcRes := sh.removeGarbage(context.Background())
	require.True(t, gcRes.success)
	require.Equal(t, uint64(1), gcRes.deleted)
}

func TestGCDontDeleteObjectFromWritecache(t *testing.T) {
	sh := newCustomShard(t, true, shardOptions{
		additionalShardOptions: []Option{WithDisabledGC()},
		wcOpts:                 []writecache.Option{writecache.WithDisableBackgroundFlush()},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	obj := testutil.GenerateObjectWithSize(1024)

	var putPrm PutPrm
	putPrm.SetObject(obj)
	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	// writecache stores object
	wcObj, err := sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
	require.NoError(t, err)
	require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))

	// blobstore doesn't store object
	bsRes, err := sh.blobStor.Get(context.Background(), common.GetPrm{
		Address: objectCore.AddressOf(obj),
	})
	require.ErrorAs(t, err, new(*apistatus.ObjectNotFound))
	require.Nil(t, bsRes.Object)
	require.Nil(t, bsRes.RawData)

	var metaInhumePrm meta.InhumePrm
	metaInhumePrm.SetAddresses(objectCore.AddressOf(obj))
	metaInhumePrm.SetLockObjectHandling()
	metaInhumePrm.SetGCMark()
	_, err = sh.metaBase.Inhume(context.Background(), metaInhumePrm)
	require.NoError(t, err)

	// logs: WARN	shard/delete.go:98	can't remove object: object must be flushed from writecache
	gcRes := sh.removeGarbage(context.Background())
	require.True(t, gcRes.success)
	require.Equal(t, uint64(0), gcRes.deleted)

	// writecache stores object
	wcObj, err = sh.writeCache.Head(context.Background(), objectCore.AddressOf(obj))
	require.NoError(t, err)
	require.Equal(t, objectCore.AddressOf(obj), objectCore.AddressOf(wcObj))
}