package policer

import (
	"bytes"
	"context"
	"errors"
	"sort"
	"testing"
	"time"

	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
	objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
	apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
	cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
	"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
	objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
	oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
	oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
	"github.com/panjf2000/ants/v2"
	"github.com/stretchr/testify/require"
)

func TestBuryObjectWithoutContainer(t *testing.T) {
	// Key space
	addr := oidtest.Address()
	objs := []objectcore.AddressWithType{
		{
			Address: addr,
			Type:    objectSDK.TypeRegular,
		},
	}

	// Container source and bury function
	buryCh := make(chan oid.Address)
	containerSrc := func(id cid.ID) (*container.Container, error) {
		return nil, apistatus.ContainerNotFound{}
	}
	buryFn := func(ctx context.Context, a oid.Address) error {
		buryCh <- a
		return nil
	}

	// Task pool
	pool, err := ants.NewPool(4)
	require.NoError(t, err)

	// Policer instance
	p := New(
		WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
		WithContainerSource(containerSrcFunc(containerSrc)),
		WithBuryFunc(buryFn),
		WithPool(pool),
		WithNodeLoader(constNodeLoader(0)),
	)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go p.Run(ctx)

	require.Equal(t, addr, <-buryCh)
}

func TestProcessObject(t *testing.T) {
	// Notes:
	// - nodes are referred to by their index throughout, which is embedded in the public key
	// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
	// - policy is used only to match the number of replicas for each index in the placement
	tests := []struct {
		desc                string
		objType             objectSDK.Type
		nodeCount           int
		policy              string
		placement           [][]int
		objHolders          []int
		maintenanceNodes    []int
		wantRemoveRedundant bool
		wantReplicateTo     []int
	}{
		{
			desc:      "1 copy already held by local node",
			nodeCount: 1,
			policy:    `REP 1`,
			placement: [][]int{{0}},
		},
		{
			desc:                "1 copy already held by the remote node",
			nodeCount:           2,
			policy:              `REP 1`,
			placement:           [][]int{{1}},
			objHolders:          []int{1},
			wantRemoveRedundant: true,
		},
		{
			desc:            "1 copy not yet held by the remote node",
			nodeCount:       2,
			policy:          `REP 1`,
			placement:       [][]int{{1}},
			wantReplicateTo: []int{1},
		},
		{
			desc:       "2 copies already held by local and remote node",
			nodeCount:  2,
			policy:     `REP 2`,
			placement:  [][]int{{0, 1}},
			objHolders: []int{1},
		},
		{
			desc:            "2 copies but not held by remote node",
			nodeCount:       2,
			policy:          `REP 2`,
			placement:       [][]int{{0, 1}},
			wantReplicateTo: []int{1},
		},
		{
			desc:       "multiple vectors already held by remote node",
			nodeCount:  2,
			policy:     `REP 2 REP 2`,
			placement:  [][]int{{0, 1}, {0, 1}},
			objHolders: []int{1},
		},
		{
			desc:            "multiple vectors not yet held by remote node",
			nodeCount:       2,
			policy:          `REP 2 REP 2`,
			placement:       [][]int{{0, 1}, {0, 1}},
			wantReplicateTo: []int{1, 1}, // is this actually good?
		},
		{
			desc:            "lock object must be replicated to all nodes",
			objType:         objectSDK.TypeLock,
			nodeCount:       3,
			policy:          `REP 1`,
			placement:       [][]int{{0, 1, 2}},
			wantReplicateTo: []int{1, 2},
		},
		{
			desc:             "preserve local copy when maintenance nodes exist",
			nodeCount:        3,
			policy:           `REP 2`,
			placement:        [][]int{{1, 2}},
			objHolders:       []int{1},
			maintenanceNodes: []int{2},
		},
	}

	for i := range tests {
		ti := tests[i]
		t.Run(ti.desc, func(t *testing.T) {
			addr := oidtest.Address()

			// Netmap, placement policy and placement builder
			nodes := make([]netmap.NodeInfo, ti.nodeCount)
			for i := range nodes {
				nodes[i].SetPublicKey([]byte{byte(i)})
			}
			for _, i := range ti.maintenanceNodes {
				nodes[i].SetMaintenance()
			}

			var policy netmap.PlacementPolicy
			require.NoError(t, policy.DecodeString(ti.policy))

			placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
			for i, pv := range ti.placement {
				for _, nj := range pv {
					placementVectors[i] = append(placementVectors[i], nodes[nj])
				}
			}
			placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
				if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
					return placementVectors, nil
				}
				t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
				return nil, errors.New("unexpected placement build")
			}

			// Object remote header
			headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
				index := int(ni.PublicKey()[0])
				if a != addr || index < 1 || index >= ti.nodeCount {
					t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
					return nil, errors.New("unexpected object head")
				}
				for _, i := range ti.objHolders {
					if index == i {
						return nil, nil
					}
				}
				return nil, apistatus.ObjectNotFound{}
			}

			// Container source
			cnr := &container.Container{}
			cnr.Value.Init()
			cnr.Value.SetPlacementPolicy(policy)
			containerSrc := func(id cid.ID) (*container.Container, error) {
				if id.Equals(addr.Container()) {
					return cnr, nil
				}
				t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
				return nil, apistatus.ContainerNotFound{}
			}
			buryFn := func(ctx context.Context, a oid.Address) error {
				t.Errorf("unexpected object buried: %v", a)
				return nil
			}

			// Policer instance
			var gotRemoveRedundant bool
			var gotReplicateTo []int

			p := New(
				WithContainerSource(containerSrcFunc(containerSrc)),
				WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
				WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
					return bytes.Equal(k, nodes[0].PublicKey())
				})),
				WithRemoteObjectHeaderFunc(headFn),
				WithBuryFunc(buryFn),
				WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
					require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
					gotRemoveRedundant = true
				}),
				WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
					require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
					for _, node := range task.Nodes {
						gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
					}
				})),
			)

			addrWithType := objectcore.AddressWithType{
				Address: addr,
				Type:    ti.objType,
			}

			p.processObject(context.Background(), addrWithType)
			sort.Ints(gotReplicateTo)

			require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
			require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
		})
	}
}

func TestIteratorContract(t *testing.T) {
	addr := oidtest.Address()
	objs := []objectcore.AddressWithType{{
		Address: addr,
		Type:    object.TypeRegular,
	}}

	containerSrc := func(id cid.ID) (*container.Container, error) {
		return nil, apistatus.ContainerNotFound{}
	}
	buryFn := func(ctx context.Context, a oid.Address) error {
		return nil
	}

	pool, err := ants.NewPool(4)
	require.NoError(t, err)

	it := &predefinedIterator{
		scenario: []nextResult{
			{objs, nil},
			{nil, errors.New("opaque")},
			{nil, engine.ErrEndOfListing},
			{nil, engine.ErrEndOfListing},
			{nil, errors.New("opaque")},
			{objs, engine.ErrEndOfListing},
		},
		finishCh: make(chan struct{}),
	}

	p := New(
		WithKeySpaceIterator(it),
		WithContainerSource(containerSrcFunc(containerSrc)),
		WithBuryFunc(buryFn),
		WithPool(pool),
		WithNodeLoader(constNodeLoader(0)),
		func(c *cfg) {
			c.sleepDuration = time.Millisecond
		},
	)

	ctx, cancel := context.WithCancel(context.Background())
	go p.Run(ctx)

	<-it.finishCh
	cancel()
	require.Equal(t, []string{
		"Next",
		"Next",
		"Next",
		"Rewind",
		"Next",
		"Rewind",
		"Next",
		"Next",
		"Rewind",
	}, it.calls)
}

// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
func eqAddr(a, b oid.Address) bool {
	return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}

type nextResult struct {
	objs []objectcore.AddressWithType
	err  error
}

type predefinedIterator struct {
	scenario []nextResult
	finishCh chan struct{}
	pos      int
	calls    []string
}

func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
	if it.pos == len(it.scenario) {
		close(it.finishCh)
		<-ctx.Done()
		return nil, nil
	}

	res := it.scenario[it.pos]
	it.pos += 1
	it.calls = append(it.calls, "Next")
	return res.objs, res.err
}

func (it *predefinedIterator) Rewind() {
	it.calls = append(it.calls, "Rewind")
}

// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct {
	objs []objectcore.AddressWithType
	cur  int
}

func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
	if it.cur >= len(it.objs) {
		return nil, engine.ErrEndOfListing
	}
	end := it.cur + int(size)
	if end > len(it.objs) {
		end = len(it.objs)
	}
	ret := it.objs[it.cur:end]
	it.cur = end
	return ret, nil
}

func (it *sliceKeySpaceIterator) Rewind() {
	it.cur = 0
}

// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)

func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }

// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)

func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
	return f(c, o, p)
}

// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
type announcedKeysFunc func([]byte) bool

func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }

// constNodeLoader is a nodeLoader that always returns a fixed value.
type constNodeLoader float64

func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }

// replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)

func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
	f(ctx, task, res)
}