package shard

import (
	"context"
	"errors"
	"testing"

	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
)

func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	sh := newCustomShard(t, false, shardOptions{
		metaOptions: []meta.Option{meta.WithEpochState(epoch)},
		additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
			return util.NewPseudoWorkerPool() // synchronous event processing
		})},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	cnr := cidtest.ID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	obj := testutil.GenerateObjectWithCID(cnr)
	obj.SetAttributes(objExpirationAttr)
	objID, _ := obj.ID()

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm PutPrm
	putPrm.SetObject(obj)

	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	epoch.Value = 105
	sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))

	var getPrm GetPrm
	getPrm.SetAddress(objectCore.AddressOf(obj))
	_, err = sh.Get(context.Background(), getPrm)
	require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted")
}

func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	cnr := cidtest.ID()
	parentID := oidtest.ID()
	splitID := objectSDK.NewSplitID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	parent := testutil.GenerateObjectWithCID(cnr)
	parent.SetID(parentID)
	parent.SetPayload(nil)
	parent.SetAttributes(objExpirationAttr)

	const childCount = 10
	children := make([]*objectSDK.Object, childCount)
	childIDs := make([]oid.ID, childCount)
	for i := range children {
		children[i] = testutil.GenerateObjectWithCID(cnr)
		if i != 0 {
			children[i].SetPreviousID(childIDs[i-1])
		}
		if i == len(children)-1 {
			children[i].SetParent(parent)
		}
		children[i].SetSplitID(splitID)
		children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
		childIDs[i], _ = children[i].ID()
	}

	link := testutil.GenerateObjectWithCID(cnr)
	link.SetParent(parent)
	link.SetParentID(parentID)
	link.SetSplitID(splitID)
	link.SetChildren(childIDs...)

	linkID, _ := link.ID()

	sh := newCustomShard(t, false, shardOptions{
		metaOptions: []meta.Option{meta.WithEpochState(epoch)},
		additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
			return util.NewPseudoWorkerPool() // synchronous event processing
		})},
	})
	defer func() { require.NoError(t, sh.Close()) }()

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm PutPrm

	for _, child := range children {
		putPrm.SetObject(child)
		_, err := sh.Put(context.Background(), putPrm)
		require.NoError(t, err)
	}

	putPrm.SetObject(link)
	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	var getPrm GetPrm
	getPrm.SetAddress(objectCore.AddressOf(parent))

	_, err = sh.Get(context.Background(), getPrm)
	var splitInfoError *objectSDK.SplitInfoError
	require.True(t, errors.As(err, &splitInfoError), "split info must be provided")

	epoch.Value = 105
	sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))

	_, err = sh.Get(context.Background(), getPrm)
	require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires")
}