package meta_test

import (
	"context"
	"errors"
	"testing"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
	cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
)

func TestDB_Delete(t *testing.T) {
	db := newDB(t)
	defer func() { require.NoError(t, db.Close()) }()

	cnr := cidtest.ID()
	parent := testutil.GenerateObjectWithCID(cnr)
	testutil.AddAttribute(parent, "foo", "bar")

	child := testutil.GenerateObjectWithCID(cnr)
	child.SetParent(parent)
	idParent, _ := parent.ID()
	child.SetParentID(idParent)

	// put object with parent
	err := putBig(db, child)
	require.NoError(t, err)

	// fill ToMoveIt index
	err = metaToMoveIt(db, object.AddressOf(child))
	require.NoError(t, err)

	// check if Movable list is not empty
	l, err := metaMovable(db)
	require.NoError(t, err)
	require.Len(t, l, 1)

	// try to remove parent, should be no-op, error-free
	err = metaDelete(db, object.AddressOf(parent))
	require.NoError(t, err)

	// inhume parent and child so they will be on graveyard
	ts := testutil.GenerateObjectWithCID(cnr)

	err = metaInhume(db, object.AddressOf(child), object.AddressOf(ts))
	require.NoError(t, err)

	ts = testutil.GenerateObjectWithCID(cnr)

	err = metaInhume(db, object.AddressOf(parent), object.AddressOf(ts))
	require.NoError(t, err)

	// delete object
	err = metaDelete(db, object.AddressOf(child))
	require.NoError(t, err)

	// check if there is no data in Movable index
	l, err = metaMovable(db)
	require.NoError(t, err)
	require.Len(t, l, 0)

	// check if they marked as already removed

	ok, err := metaExists(db, object.AddressOf(child))
	require.True(t, client.IsErrObjectAlreadyRemoved(err))
	require.False(t, ok)

	ok, err = metaExists(db, object.AddressOf(parent))
	require.True(t, client.IsErrObjectAlreadyRemoved(err))
	require.False(t, ok)
}

func TestDeleteAllChildren(t *testing.T) {
	db := newDB(t)
	defer func() { require.NoError(t, db.Close()) }()

	cnr := cidtest.ID()

	// generate parent object
	parent := testutil.GenerateObjectWithCID(cnr)

	// generate 2 children
	child1 := testutil.GenerateObjectWithCID(cnr)
	child1.SetParent(parent)
	idParent, _ := parent.ID()
	child1.SetParentID(idParent)

	child2 := testutil.GenerateObjectWithCID(cnr)
	child2.SetParent(parent)
	child2.SetParentID(idParent)

	// put children
	require.NoError(t, putBig(db, child1))
	require.NoError(t, putBig(db, child2))

	// Exists should return split info for parent
	_, err := metaExists(db, object.AddressOf(parent))
	siErr := objectSDK.NewSplitInfoError(nil)
	require.True(t, errors.As(err, &siErr))

	// remove all children in single call
	err = metaDelete(db, object.AddressOf(child1), object.AddressOf(child2))
	require.NoError(t, err)

	// parent should not be found now
	ex, err := metaExists(db, object.AddressOf(parent))
	require.NoError(t, err)
	require.False(t, ex)
}

func TestGraveOnlyDelete(t *testing.T) {
	db := newDB(t)
	defer func() { require.NoError(t, db.Close()) }()

	addr := oidtest.Address()

	// inhume non-existent object by address
	require.NoError(t, metaInhume(db, addr, oidtest.Address()))

	// delete the object data
	require.NoError(t, metaDelete(db, addr))
}

func TestExpiredObject(t *testing.T) {
	db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
	defer func() { require.NoError(t, db.Close()) }()

	checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
		// removing expired object should be error-free
		require.NoError(t, metaDelete(db, object.AddressOf(exp)))

		require.NoError(t, metaDelete(db, object.AddressOf(nonExp)))
	})
}

func TestDelete(t *testing.T) {
	db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
	defer func() { require.NoError(t, db.Close()) }()

	cnr := cidtest.ID()
	for i := 0; i < 10; i++ {
		obj := testutil.GenerateObjectWithCID(cnr)

		var prm meta.PutPrm
		prm.SetObject(obj)
		prm.SetStorageID([]byte("0/0"))
		_, err := db.Put(context.Background(), prm)
		require.NoError(t, err)

		var inhumePrm meta.InhumePrm
		inhumePrm.SetAddresses(object.AddressOf(obj))
		_, err = db.Inhume(context.Background(), inhumePrm)
		require.NoError(t, err)
	}

	var addrs []oid.Address
	var iprm meta.GarbageIterationPrm
	iprm.SetHandler(func(o meta.GarbageObject) error {
		addrs = append(addrs, o.Address())
		return nil
	})
	require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
	require.Equal(t, 10, len(addrs))
	var deletePrm meta.DeletePrm
	deletePrm.SetAddresses(addrs...)
	_, err := db.Delete(context.Background(), deletePrm)
	require.NoError(t, err)

	addrs = nil
	iprm.SetHandler(func(o meta.GarbageObject) error {
		addrs = append(addrs, o.Address())
		return nil
	})
	require.NoError(t, db.IterateOverGarbage(context.Background(), iprm))
	require.Equal(t, 0, len(addrs))
}

func TestDeleteDropsGCMarkIfObjectNotFound(t *testing.T) {
	db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
	defer func() { require.NoError(t, db.Close()) }()

	addr := oidtest.Address()

	var prm meta.InhumePrm
	prm.SetAddresses(addr)
	prm.SetGCMark()
	_, err := db.Inhume(context.Background(), prm)
	require.NoError(t, err)

	var garbageCount int
	var itPrm meta.GarbageIterationPrm
	itPrm.SetHandler(func(g meta.GarbageObject) error {
		garbageCount++
		return nil
	})
	require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
	require.Equal(t, 1, garbageCount)

	var delPrm meta.DeletePrm
	delPrm.SetAddresses(addr)
	_, err = db.Delete(context.Background(), delPrm)
	require.NoError(t, err)

	garbageCount = 0
	require.NoError(t, db.IterateOverGarbage(context.Background(), itPrm))
	require.Equal(t, 0, garbageCount)
}

func metaDelete(db *meta.DB, addrs ...oid.Address) error {
	var deletePrm meta.DeletePrm
	deletePrm.SetAddresses(addrs...)

	_, err := db.Delete(context.Background(), deletePrm)
	return err
}