package shard_test

import (
	"context"
	"errors"
	"testing"
	"time"

	objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
	objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
	writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
)

func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	wcOpts := writecacheconfig.Options{
		Type: writecacheconfig.TypeBBolt,
	}
	sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)})

	t.Cleanup(func() {
		releaseShard(sh, t)
	})

	cnr := cidtest.ID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	obj := testutil.GenerateObjectWithCID(cnr)
	obj.SetAttributes(objExpirationAttr)
	objID, _ := obj.ID()

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm shard.PutPrm
	putPrm.SetObject(obj)

	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, []oid.ID{objID})
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	epoch.Value = 105
	sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)

	var getPrm shard.GetPrm
	getPrm.SetAddress(objectCore.AddressOf(obj))
	require.Eventually(t, func() bool {
		_, err = sh.Get(context.Background(), getPrm)
		return shard.IsErrNotFound(err)
	}, 3*time.Second, 1*time.Second, "expired object must be deleted")
}

func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
	t.Parallel()

	epoch := &epochState{
		Value: 100,
	}

	cnr := cidtest.ID()
	parentID := oidtest.ID()
	splitID := objectSDK.NewSplitID()

	var objExpirationAttr objectSDK.Attribute
	objExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	objExpirationAttr.SetValue("101")

	var lockExpirationAttr objectSDK.Attribute
	lockExpirationAttr.SetKey(objectV2.SysAttributeExpEpoch)
	lockExpirationAttr.SetValue("103")

	parent := testutil.GenerateObjectWithCID(cnr)
	parent.SetID(parentID)
	parent.SetPayload(nil)
	parent.SetAttributes(objExpirationAttr)

	const childCount = 10
	children := make([]*objectSDK.Object, childCount)
	childIDs := make([]oid.ID, childCount)
	for i := range children {
		children[i] = testutil.GenerateObjectWithCID(cnr)
		if i != 0 {
			children[i].SetPreviousID(childIDs[i-1])
		}
		if i == len(children)-1 {
			children[i].SetParent(parent)
		}
		children[i].SetSplitID(splitID)
		children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
		childIDs[i], _ = children[i].ID()
	}

	link := testutil.GenerateObjectWithCID(cnr)
	link.SetParent(parent)
	link.SetParentID(parentID)
	link.SetSplitID(splitID)
	link.SetChildren(childIDs...)

	linkID, _ := link.ID()

	wcOpts := writecacheconfig.Options{
		Type: writecacheconfig.TypeBBolt,
	}
	sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)})

	t.Cleanup(func() {
		releaseShard(sh, t)
	})

	lock := testutil.GenerateObjectWithCID(cnr)
	lock.SetType(objectSDK.TypeLock)
	lock.SetAttributes(lockExpirationAttr)
	lockID, _ := lock.ID()

	var putPrm shard.PutPrm

	for _, child := range children {
		putPrm.SetObject(child)
		_, err := sh.Put(context.Background(), putPrm)
		require.NoError(t, err)
	}

	putPrm.SetObject(link)
	_, err := sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	err = sh.Lock(context.Background(), cnr, lockID, append(childIDs, parentID, linkID))
	require.NoError(t, err)

	putPrm.SetObject(lock)
	_, err = sh.Put(context.Background(), putPrm)
	require.NoError(t, err)

	var getPrm shard.GetPrm
	getPrm.SetAddress(objectCore.AddressOf(parent))

	_, err = sh.Get(context.Background(), getPrm)
	var splitInfoError *objectSDK.SplitInfoError
	require.True(t, errors.As(err, &splitInfoError), "split info must be provided")

	epoch.Value = 105
	sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)

	require.Eventually(t, func() bool {
		_, err = sh.Get(context.Background(), getPrm)
		return shard.IsErrNotFound(err)
	}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
}