package meta_test

import (
	"context"
	"testing"

	objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
	meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/stretchr/testify/require"
)

const objCount = 10

func TestCounters(t *testing.T) {
	t.Parallel()

	t.Run("defaults", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		c, err := db.ObjectCounters()
		require.NoError(t, err)
		require.Zero(t, c.Phy)
		require.Zero(t, c.Logic)
		require.Zero(t, c.User)

		cc, err := db.ContainerCounters(context.Background())
		require.NoError(t, err)
		require.Zero(t, len(cc.Counts))
	})

	t.Run("put", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		oo := make([]*objectSDK.Object, 0, objCount)
		for range objCount {
			oo = append(oo, testutil.GenerateObject())
		}

		var prm meta.PutPrm
		exp := make(map[cid.ID]meta.ObjectCounters)

		for i := range objCount {
			prm.SetObject(oo[i])
			cnrID, _ := oo[i].ContainerID()
			c := meta.ObjectCounters{}
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}

			_, err := db.Put(context.Background(), prm)
			require.NoError(t, err)

			c, err = db.ObjectCounters()
			require.NoError(t, err)

			require.Equal(t, uint64(i+1), c.Phy)
			require.Equal(t, uint64(i+1), c.Logic)

			cc, err := db.ContainerCounters(context.Background())
			require.NoError(t, err)

			require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
		}
	})

	t.Run("delete", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		oo := putObjs(t, db, objCount, false)

		exp := make(map[cid.ID]meta.ObjectCounters)
		for _, obj := range oo {
			cnrID, _ := obj.ContainerID()
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}
		}

		var prm meta.DeletePrm
		for i := objCount - 1; i >= 0; i-- {
			prm.SetAddresses(objectcore.AddressOf(oo[i]))

			res, err := db.Delete(context.Background(), prm)
			require.NoError(t, err)
			require.Equal(t, uint64(1), res.LogicCount())

			c, err := db.ObjectCounters()
			require.NoError(t, err)

			require.Equal(t, uint64(i), c.Phy)
			require.Equal(t, uint64(i), c.Logic)
			require.Equal(t, uint64(i), c.User)

			cnrID, _ := oo[i].ContainerID()
			if v, ok := exp[cnrID]; ok {
				v.Phy--
				v.Logic--
				v.User--
				exp[cnrID] = v
			}

			cc, err := db.ContainerCounters(context.Background())
			require.NoError(t, err)
			require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
		}
	})

	t.Run("inhume", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		oo := putObjs(t, db, objCount, false)

		exp := make(map[cid.ID]meta.ObjectCounters)
		for _, obj := range oo {
			cnrID, _ := obj.ContainerID()
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}
		}

		inhumedObjs := make([]oid.Address, objCount/2)

		for i, o := range oo {
			if i == len(inhumedObjs) {
				break
			}

			inhumedObjs[i] = objectcore.AddressOf(o)
		}

		for _, addr := range inhumedObjs {
			if v, ok := exp[addr.Container()]; ok {
				v.Logic--
				v.User--
				if v.IsZero() {
					delete(exp, addr.Container())
				} else {
					exp[addr.Container()] = v
				}
			}
		}

		var prm meta.InhumePrm
		for _, o := range inhumedObjs {
			tombAddr := oidtest.Address()
			tombAddr.SetContainer(o.Container())

			prm.SetTombstoneAddress(tombAddr)
			prm.SetAddresses(o)

			res, err := db.Inhume(context.Background(), prm)
			require.NoError(t, err)
			require.Equal(t, uint64(1), res.LogicInhumed())
			require.Equal(t, uint64(1), res.UserInhumed())
		}

		c, err := db.ObjectCounters()
		require.NoError(t, err)

		require.Equal(t, uint64(objCount), c.Phy)
		require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
		require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)

		cc, err := db.ContainerCounters(context.Background())
		require.NoError(t, err)

		require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
	})

	t.Run("put_split", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		parObj := testutil.GenerateObject()

		exp := make(map[cid.ID]meta.ObjectCounters)

		// put objects and check that parent info
		// does not affect the counter
		for i := range objCount {
			o := testutil.GenerateObject()
			if i < objCount/2 { // half of the objs will have the parent
				o.SetParent(parObj)
				o.SetSplitID(objectSDK.NewSplitID())
			}

			cnrID, _ := o.ContainerID()
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}

			require.NoError(t, putBig(db, o))

			c, err := db.ObjectCounters()
			require.NoError(t, err)
			require.Equal(t, uint64(i+1), c.Phy)
			require.Equal(t, uint64(i+1), c.Logic)
			require.Equal(t, uint64(i+1), c.User)

			cc, err := db.ContainerCounters(context.Background())
			require.NoError(t, err)
			require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
		}
	})

	t.Run("delete_split", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		oo := putObjs(t, db, objCount, true)

		exp := make(map[cid.ID]meta.ObjectCounters)
		for _, obj := range oo {
			cnrID, _ := obj.ContainerID()
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}
		}

		// delete objects that have parent info
		// and check that it does not affect
		// the counter
		for i, o := range oo {
			addr := objectcore.AddressOf(o)
			require.NoError(t, metaDelete(db, addr))

			c, err := db.ObjectCounters()
			require.NoError(t, err)
			require.Equal(t, uint64(objCount-i-1), c.Phy)
			require.Equal(t, uint64(objCount-i-1), c.Logic)
			require.Equal(t, uint64(objCount-i-1), c.User)

			if v, ok := exp[addr.Container()]; ok {
				v.Logic--
				v.Phy--
				v.User--
				if v.IsZero() {
					delete(exp, addr.Container())
				} else {
					exp[addr.Container()] = v
				}
			}
		}
	})

	t.Run("inhume_split", func(t *testing.T) {
		t.Parallel()
		db := newDB(t)
		defer func() { require.NoError(t, db.Close(context.Background())) }()
		oo := putObjs(t, db, objCount, true)

		exp := make(map[cid.ID]meta.ObjectCounters)
		for _, obj := range oo {
			cnrID, _ := obj.ContainerID()
			exp[cnrID] = meta.ObjectCounters{
				Logic: 1,
				Phy:   1,
				User:  1,
			}
		}

		inhumedObjs := make([]oid.Address, objCount/2)

		for i, o := range oo {
			if i == len(inhumedObjs) {
				break
			}

			inhumedObjs[i] = objectcore.AddressOf(o)
		}

		for _, addr := range inhumedObjs {
			if v, ok := exp[addr.Container()]; ok {
				v.Logic--
				v.User--
				if v.IsZero() {
					delete(exp, addr.Container())
				} else {
					exp[addr.Container()] = v
				}
			}
		}

		var prm meta.InhumePrm
		for _, o := range inhumedObjs {
			tombAddr := oidtest.Address()
			tombAddr.SetContainer(o.Container())

			prm.SetTombstoneAddress(tombAddr)
			prm.SetAddresses(o)

			_, err := db.Inhume(context.Background(), prm)
			require.NoError(t, err)
		}

		c, err := db.ObjectCounters()
		require.NoError(t, err)

		require.Equal(t, uint64(objCount), c.Phy)
		require.Equal(t, uint64(objCount-len(inhumedObjs)), c.Logic)
		require.Equal(t, uint64(objCount-len(inhumedObjs)), c.User)

		cc, err := db.ContainerCounters(context.Background())
		require.NoError(t, err)

		require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
	})
}

func TestDoublePut(t *testing.T) {
	t.Parallel()
	db := newDB(t)
	defer func() { require.NoError(t, db.Close(context.Background())) }()
	obj := testutil.GenerateObject()

	exp := make(map[cid.ID]meta.ObjectCounters)
	cnrID, _ := obj.ContainerID()
	exp[cnrID] = meta.ObjectCounters{
		Logic: 1,
		Phy:   1,
		User:  1,
	}

	var prm meta.PutPrm
	prm.SetObject(obj)
	pr, err := db.Put(context.Background(), prm)
	require.NoError(t, err)
	require.True(t, pr.Inserted)

	c, err := db.ObjectCounters()
	require.NoError(t, err)

	require.Equal(t, uint64(1), c.Phy)
	require.Equal(t, uint64(1), c.Logic)
	require.Equal(t, uint64(1), c.User)

	cc, err := db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)

	pr, err = db.Put(context.Background(), prm)
	require.NoError(t, err)
	require.False(t, pr.Inserted)

	c, err = db.ObjectCounters()
	require.NoError(t, err)

	require.Equal(t, uint64(1), c.Phy)
	require.Equal(t, uint64(1), c.Logic)
	require.Equal(t, uint64(1), c.User)

	cc, err = db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
}

func TestCounters_Expired(t *testing.T) {
	// That test is about expired objects without
	// GCMark yet. Such objects should be treated as
	// logically available: decrementing logic counter
	// should be done explicitly and only in `Delete`
	// and `Inhume` operations, otherwise, it would be
	// impossible to maintain logic counter.

	const epoch = 123

	es := &epochState{epoch}
	db := newDB(t, meta.WithEpochState(es))
	defer func() { require.NoError(t, db.Close(context.Background())) }()

	oo := make([]oid.Address, objCount)
	for i := range oo {
		oo[i] = putWithExpiration(t, db, objectSDK.TypeRegular, epoch+1)
	}

	exp := make(map[cid.ID]meta.ObjectCounters)
	for _, addr := range oo {
		exp[addr.Container()] = meta.ObjectCounters{
			Logic: 1,
			Phy:   1,
			User:  1,
		}
	}

	// 1. objects are available and counters are correct

	c, err := db.ObjectCounters()
	require.NoError(t, err)
	require.Equal(t, uint64(objCount), c.Phy)
	require.Equal(t, uint64(objCount), c.Logic)
	require.Equal(t, uint64(objCount), c.User)

	cc, err := db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)

	for _, o := range oo {
		_, err := metaGet(db, o, true)
		require.NoError(t, err)
	}

	// 2. objects are expired, not available but logic counter
	// is the same

	es.e = epoch + 2

	c, err = db.ObjectCounters()
	require.NoError(t, err)
	require.Equal(t, uint64(objCount), c.Phy)
	require.Equal(t, uint64(objCount), c.Logic)
	require.Equal(t, uint64(objCount), c.User)

	cc, err = db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)

	for _, o := range oo {
		_, err := metaGet(db, o, true)
		require.ErrorIs(t, err, meta.ErrObjectIsExpired)
	}

	// 3. inhuming an expired object with GCMark (like it would
	// the GC do) should decrease the logic counter despite the
	// expiration fact

	var inhumePrm meta.InhumePrm
	inhumePrm.SetGCMark()
	inhumePrm.SetAddresses(oo[0])

	inhumeRes, err := db.Inhume(context.Background(), inhumePrm)
	require.NoError(t, err)
	require.Equal(t, uint64(1), inhumeRes.LogicInhumed())
	require.Equal(t, uint64(1), inhumeRes.UserInhumed())

	c, err = db.ObjectCounters()
	require.NoError(t, err)

	require.Equal(t, uint64(len(oo)), c.Phy)
	require.Equal(t, uint64(len(oo)-1), c.Logic)
	require.Equal(t, uint64(len(oo)-1), c.User)

	if v, ok := exp[oo[0].Container()]; ok {
		v.Logic--
		v.User--
		if v.IsZero() {
			delete(exp, oo[0].Container())
		} else {
			exp[oo[0].Container()] = v
		}
	}

	cc, err = db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)

	// 4. `Delete` an object with GCMark should decrease the
	// phy counter but does not affect the logic counter (after
	// that step they should be equal)

	var deletePrm meta.DeletePrm
	deletePrm.SetAddresses(oo[0])

	deleteRes, err := db.Delete(context.Background(), deletePrm)
	require.NoError(t, err)
	require.Zero(t, deleteRes.LogicCount())
	require.Zero(t, deleteRes.UserCount())

	if v, ok := exp[oo[0].Container()]; ok {
		v.Phy--
		exp[oo[0].Container()] = v
	}

	oo = oo[1:]

	c, err = db.ObjectCounters()
	require.NoError(t, err)
	require.Equal(t, uint64(len(oo)), c.Phy)
	require.Equal(t, uint64(len(oo)), c.Logic)
	require.Equal(t, uint64(len(oo)), c.User)

	cc, err = db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)

	// 5 `Delete` an expired object (like it would the control
	// service do) should decrease both counters despite the
	// expiration fact

	deletePrm.SetAddresses(oo[0])

	deleteRes, err = db.Delete(context.Background(), deletePrm)
	require.NoError(t, err)
	require.Equal(t, uint64(1), deleteRes.LogicCount())
	require.Equal(t, uint64(1), deleteRes.UserCount())

	if v, ok := exp[oo[0].Container()]; ok {
		v.Phy--
		v.Logic--
		v.User--
		exp[oo[0].Container()] = v
	}

	oo = oo[1:]

	c, err = db.ObjectCounters()
	require.NoError(t, err)
	require.Equal(t, uint64(len(oo)), c.Phy)
	require.Equal(t, uint64(len(oo)), c.Logic)
	require.Equal(t, uint64(len(oo)), c.User)

	cc, err = db.ContainerCounters(context.Background())
	require.NoError(t, err)

	require.Equal(t, meta.ContainerCounters{Counts: exp}, cc)
}

func putObjs(t *testing.T, db *meta.DB, count int, withParent bool) []*objectSDK.Object {
	var prm meta.PutPrm
	var err error
	parent := testutil.GenerateObject()

	oo := make([]*objectSDK.Object, 0, count)
	for i := range count {
		o := testutil.GenerateObject()
		if withParent {
			o.SetParent(parent)
			o.SetSplitID(objectSDK.NewSplitID())
		}

		oo = append(oo, o)

		prm.SetObject(o)
		_, err = db.Put(context.Background(), prm)
		require.NoError(t, err)

		c, err := db.ObjectCounters()
		require.NoError(t, err)

		require.Equal(t, uint64(i+1), c.Phy)
		require.Equal(t, uint64(i+1), c.Logic)
	}

	return oo
}