import logging
import random
import sys

import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli
from frostfs_testlib.resources.error_patterns import (
    INVALID_LENGTH_SPECIFIER,
    INVALID_OFFSET_SPECIFIER,
    INVALID_RANGE_OVERFLOW,
    INVALID_RANGE_ZERO_LENGTH,
    OBJECT_ALREADY_REMOVED,
    OUT_OF_RANGE,
)
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import DEFAULT_EC_PLACEMENT_RULE, DEFAULT_PLACEMENT_RULE, search_nodes_with_container
from frostfs_testlib.steps.cli.object import (
    get_object_from_random_node,
    get_range,
    get_range_hash,
    head_object,
    put_object,
    put_object_to_random_node,
    search_object,
)
from frostfs_testlib.steps.complex_object_actions import get_complex_object_split_ranges
from frostfs_testlib.steps.storage_object import delete_object, delete_objects
from frostfs_testlib.steps.storage_policy import get_complex_object_copies, get_simple_object_copies
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.file_utils import TestFile, get_file_content, get_file_hash

from ...helpers.container_creation import create_container_with_ape
from ...helpers.container_request import APE_EVERYONE_ALLOW_ALL, ContainerRequest

logger = logging.getLogger("NeoLogger")

CLEANUP_TIMEOUT = 10
COMMON_ATTRIBUTE = {"common_key": "common_value"}
COMMON_CONTAINER_RULE = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
# Will upload object for each attribute set
OBJECT_ATTRIBUTES = [
    None,
    {"key1": 1, "key2": "abc", "common_key": "common_value"},
    {"key1": 2, "common_key": "common_value"},
]

# Config for Range tests
RANGES_COUNT = 4  # by quarters
RANGE_MIN_LEN = 10
RANGE_MAX_LEN = 500
# Used for static ranges found with issues
STATIC_RANGES = {}


def generate_ranges(storage_object: StorageObjectInfo, max_object_size: int, shell: Shell, cluster: Cluster) -> list[(int, int)]:
    file_range_step = storage_object.size / RANGES_COUNT

    file_ranges = []
    file_ranges_to_test = []

    for i in range(0, RANGES_COUNT):
        file_ranges.append((int(file_range_step * i), int(file_range_step)))

    # For simple object we can read all file ranges without too much time for testing
    if storage_object.size < max_object_size:
        file_ranges_to_test.extend(file_ranges)
    # For complex object we need to fetch multiple child objects from different nodes.
    else:
        assert (
            storage_object.size >= RANGE_MAX_LEN + max_object_size
        ), f"Complex object size should be at least {max_object_size + RANGE_MAX_LEN}. Current: {storage_object.size}"
        file_ranges_to_test.append((RANGE_MAX_LEN, max_object_size - RANGE_MAX_LEN))
        file_ranges_to_test.extend(get_complex_object_split_ranges(storage_object, shell, cluster))

    # Special cases to read some bytes from start and some bytes from end of object
    file_ranges_to_test.append((0, RANGE_MIN_LEN))
    file_ranges_to_test.append((storage_object.size - RANGE_MIN_LEN, RANGE_MIN_LEN))

    for offset, length in file_ranges:
        range_length = random.randint(RANGE_MIN_LEN, RANGE_MAX_LEN)
        range_start = random.randint(offset, offset + length - 1)

        file_ranges_to_test.append((range_start, min(range_length, storage_object.size - range_start)))

    file_ranges_to_test.extend(STATIC_RANGES.get(storage_object.size, []))

    return file_ranges_to_test


@pytest.fixture(scope="module")
def common_container(
    frostfs_cli: FrostfsCli,
    default_wallet: WalletInfo,
    client_shell: Shell,
    cluster: Cluster,
    rpc_endpoint: str,
) -> str:
    container_request = ContainerRequest(COMMON_CONTAINER_RULE, APE_EVERYONE_ALLOW_ALL)
    return create_container_with_ape(container_request, frostfs_cli, default_wallet, client_shell, cluster, rpc_endpoint)


@pytest.fixture(scope="module")
def container_nodes(default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, common_container: str) -> list[ClusterNode]:
    return search_nodes_with_container(default_wallet, common_container, client_shell, cluster.default_rpc_endpoint, cluster)


@pytest.fixture(scope="module")
def non_container_nodes(cluster: Cluster, container_nodes: list[ClusterNode]) -> list[ClusterNode]:
    return list(set(cluster.cluster_nodes) - set(container_nodes))


@pytest.fixture(
    # Scope session to upload/delete each files set only once
    scope="module"
)
def storage_objects(
    default_wallet: WalletInfo,
    client_shell: Shell,
    cluster: Cluster,
    object_size: ObjectSize,
    test_file_module: TestFile,
    frostfs_cli: FrostfsCli,
    placement_policy: PlacementPolicy,
    rpc_endpoint: str,
) -> list[StorageObjectInfo]:
    cid = create_container_with_ape(
        ContainerRequest(placement_policy.value, APE_EVERYONE_ALLOW_ALL), frostfs_cli, default_wallet, client_shell, cluster, rpc_endpoint
    )

    with reporter.step("Generate file"):
        file_hash = get_file_hash(test_file_module.path)
    storage_objects = []

    with reporter.step("Put objects"):
        # We need to upload objects multiple times with different attributes
        for attributes in OBJECT_ATTRIBUTES:
            storage_object_id = put_object_to_random_node(
                default_wallet,
                test_file_module.path,
                cid,
                client_shell,
                cluster,
                attributes=attributes,
            )

            storage_object = StorageObjectInfo(cid, storage_object_id)
            storage_object.size = object_size.value
            storage_object.wallet = default_wallet
            storage_object.file_path = test_file_module.path
            storage_object.file_hash = file_hash
            storage_object.attributes = attributes

            storage_objects.append(storage_object)

    yield storage_objects

    # Teardown after all tests done with current param
    delete_objects(storage_objects, client_shell, cluster)


@pytest.fixture()
def expected_object_copies(placement_policy: PlacementPolicy) -> int:
    if placement_policy.name == "rep":
        return 2
    return 4


@pytest.mark.nightly
@pytest.mark.sanity
@pytest.mark.grpc_api
class TestObjectApi(ClusterTestBase):
    @allure.title("Storage policy by native API (obj_size={object_size}, policy={placement_policy})")
    def test_object_storage_policies(
        self,
        storage_objects: list[StorageObjectInfo],
        simple_object_size: ObjectSize,
        expected_object_copies: int,
    ):
        """
        Validate object storage policy
        """

        with reporter.step("Validate storage policy for objects"):
            for storage_object in storage_objects:
                if storage_object.size == simple_object_size.value:
                    copies = get_simple_object_copies(
                        storage_object.wallet,
                        storage_object.cid,
                        storage_object.oid,
                        shell=self.shell,
                        nodes=self.cluster.storage_nodes,
                    )
                else:
                    copies = get_complex_object_copies(
                        storage_object.wallet,
                        storage_object.cid,
                        storage_object.oid,
                        shell=self.shell,
                        nodes=self.cluster.storage_nodes,
                    )
                assert copies == expected_object_copies, f"Expected {expected_object_copies} copies"

    @allure.title("Get object by native API (obj_size={object_size}, policy={placement_policy})")
    def test_get_object_api(self, storage_objects: list[StorageObjectInfo]):
        """
        Validate get object native API
        """

        with reporter.step("Get objects and compare hashes"):
            for storage_object in storage_objects:
                file_path = get_object_from_random_node(
                    storage_object.wallet,
                    storage_object.cid,
                    storage_object.oid,
                    self.shell,
                    cluster=self.cluster,
                )
                file_hash = get_file_hash(file_path)
                assert storage_object.file_hash == file_hash

    @allure.title("Head object by native API (obj_size={object_size}, policy={placement_policy})")
    def test_head_object_api(self, storage_objects: list[StorageObjectInfo]):
        """
        Validate head object native API
        """

        storage_object_1 = storage_objects[0]
        storage_object_2 = storage_objects[1]

        with reporter.step("Head object and validate"):
            head_object(
                storage_object_1.wallet,
                storage_object_1.cid,
                storage_object_1.oid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
            )
            head_info = head_object(
                storage_object_2.wallet,
                storage_object_2.cid,
                storage_object_2.oid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
            )
            self.check_header_is_presented(head_info, storage_object_2.attributes)

    @allure.title("Head deleted object with --raw arg (obj_size={object_size}, policy={container_request})")
    @pytest.mark.parametrize(
        "container_request",
        [
            ContainerRequest(DEFAULT_PLACEMENT_RULE, APE_EVERYONE_ALLOW_ALL, "rep"),
            ContainerRequest(DEFAULT_EC_PLACEMENT_RULE, APE_EVERYONE_ALLOW_ALL, "ec"),
        ],
        indirect=True,
        ids=["rep", "ec"],
    )
    def test_object_head_raw(self, default_wallet: str, container: str, test_file: TestFile):
        with reporter.step("Upload object"):
            oid = put_object_to_random_node(default_wallet, test_file.path, container, self.shell, self.cluster)

        with reporter.step("Delete object"):
            delete_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint)

        with reporter.step("Call object head --raw and expect error"):
            with pytest.raises(Exception, match=OBJECT_ALREADY_REMOVED):
                head_object(default_wallet, container, oid, self.shell, self.cluster.default_rpc_endpoint, is_raw=True)

    @allure.title("Search objects by native API (obj_size={object_size}, policy={placement_policy})")
    def test_search_object_api(self, storage_objects: list[StorageObjectInfo]):
        """
        Validate object search by native API
        """

        oids = [storage_object.oid for storage_object in storage_objects]
        wallet = storage_objects[0].wallet
        cid = storage_objects[0].cid

        test_table = [
            (OBJECT_ATTRIBUTES[1], oids[1:2]),
            (OBJECT_ATTRIBUTES[2], oids[2:3]),
            (COMMON_ATTRIBUTE, oids[1:3]),
        ]

        with reporter.step("Search objects"):
            # Search with no attributes
            result = search_object(
                wallet,
                cid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
                expected_objects_list=oids,
                root=True,
            )
            assert sorted(oids) == sorted(result)

            # search by test table
            for filter, expected_oids in test_table:
                result = search_object(
                    wallet,
                    cid,
                    shell=self.shell,
                    endpoint=self.cluster.default_rpc_endpoint,
                    filters=filter,
                    expected_objects_list=expected_oids,
                    root=True,
                )
                assert sorted(expected_oids) == sorted(result)

    @allure.title("Search objects with removed items (obj_size={object_size})")
    @pytest.mark.exclude_sanity
    def test_object_search_should_return_tombstone_items(
        self,
        default_wallet: WalletInfo,
        container: str,
        object_size: ObjectSize,
        rpc_endpoint: str,
        test_file: TestFile,
    ):
        """
        Validate object search with removed items
        """

        with reporter.step("Put object to container"):
            storage_object = StorageObjectInfo(
                cid=container,
                oid=put_object_to_random_node(default_wallet, test_file.path, container, self.shell, self.cluster),
                size=object_size.value,
                wallet=default_wallet,
                file_path=test_file.path,
                file_hash=get_file_hash(test_file.path),
            )

        with reporter.step("Search object"):
            # Root Search object should return root object oid
            result = search_object(default_wallet, container, self.shell, rpc_endpoint, root=True)
            objects_before_deletion = len(result)
            assert storage_object.oid in result

        with reporter.step("Delete object"):
            delete_objects([storage_object], self.shell, self.cluster)

        with reporter.step("Search deleted object with --root"):
            # Root Search object should return nothing
            result = search_object(default_wallet, container, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, root=True)
            assert len(result) == 0

        with reporter.step("Search deleted object with --phy should return only tombstones"):
            # Physical Search object should return only tombstones
            result = search_object(default_wallet, container, self.shell, rpc_endpoint, phy=True)
            assert storage_object.tombstone in result, "Search result should contain tombstone of removed object"
            assert storage_object.oid not in result, "Search result should not contain ObjectId of removed object"
            for tombstone_oid in result:
                head_info = head_object(default_wallet, container, tombstone_oid, self.shell, rpc_endpoint)
                object_type = head_info["header"]["objectType"]
                assert object_type == "TOMBSTONE", f"Object wasn't deleted properly. Found object {tombstone_oid} with type {object_type}"

    @allure.title("Get range hash by native API (obj_size={object_size}, policy={placement_policy})")
    @pytest.mark.grpc_api
    def test_object_get_range_hash(self, storage_objects: list[StorageObjectInfo], max_object_size):
        """
        Validate get_range_hash for object by native gRPC API
        """

        wallet = storage_objects[0].wallet
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_path = storage_objects[0].file_path

        file_ranges_to_test = generate_ranges(storage_objects[0], max_object_size, self.shell, self.cluster)
        logging.info(f"Ranges used in test {file_ranges_to_test}")

        for range_start, range_len in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            with reporter.step(f"Get range hash ({range_cut})"):
                for oid in oids:
                    range_hash = get_range_hash(
                        wallet,
                        cid,
                        oid,
                        shell=self.shell,
                        endpoint=self.cluster.default_rpc_endpoint,
                        range_cut=range_cut,
                    )
                    assert (
                        get_file_hash(file_path, range_len, range_start) == range_hash
                    ), f"Expected range hash to match {range_cut} slice of file payload"

    @allure.title("Get range by native API (obj_size={object_size}, policy={placement_policy})")
    @pytest.mark.grpc_api
    def test_object_get_range(self, storage_objects: list[StorageObjectInfo], max_object_size):
        """
        Validate get_range for object by native gRPC API
        """

        wallet = storage_objects[0].wallet
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_path = storage_objects[0].file_path

        file_ranges_to_test = generate_ranges(storage_objects[0], max_object_size, self.shell, self.cluster)
        logging.info(f"Ranges used in test {file_ranges_to_test}")

        for range_start, range_len in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            with reporter.step(f"Get range ({range_cut})"):
                for oid in oids:
                    _, range_content = get_range(
                        wallet,
                        cid,
                        oid,
                        shell=self.shell,
                        endpoint=self.cluster.default_rpc_endpoint,
                        range_cut=range_cut,
                    )
                    assert (
                        get_file_content(file_path, content_len=range_len, mode="rb", offset=range_start) == range_content
                    ), f"Expected range content to match {range_cut} slice of file payload"

    @allure.title("[NEGATIVE] Get invalid range by native API (obj_size={object_size}, policy={placement_policy})")
    @pytest.mark.grpc_api
    def test_object_get_range_negatives(
        self,
        storage_objects: list[StorageObjectInfo],
    ):
        """
        Validate get_range negative for object by native gRPC API
        """

        wallet = storage_objects[0].wallet
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_size = storage_objects[0].size

        assert RANGE_MIN_LEN < file_size, f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"

        file_ranges_to_test: list[tuple(int, int, str)] = [
            # Offset is bigger than the file size, the length is small.
            (file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
            # Offset is ok, but offset+length is too big.
            (file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
            # Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
            (RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
            # Length is zero
            (10, 0, INVALID_RANGE_ZERO_LENGTH),
            # Negative values
            (-1, 1, INVALID_OFFSET_SPECIFIER),
            (10, -5, INVALID_LENGTH_SPECIFIER),
        ]

        for range_start, range_len, expected_error in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            expected_error = expected_error.format(range=range_cut) if "{range}" in expected_error else expected_error
            with reporter.step(f"Get range ({range_cut})"):
                for oid in oids:
                    with pytest.raises(Exception, match=expected_error):
                        get_range(
                            wallet,
                            cid,
                            oid,
                            shell=self.shell,
                            endpoint=self.cluster.default_rpc_endpoint,
                            range_cut=range_cut,
                        )

    @allure.title("[NEGATIVE] Get invalid range hash by native API (obj_size={object_size}, policy={placement_policy})")
    def test_object_get_range_hash_negatives(
        self,
        storage_objects: list[StorageObjectInfo],
    ):
        """
        Validate get_range_hash negative for object by native gRPC API
        """

        wallet = storage_objects[0].wallet
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_size = storage_objects[0].size

        assert RANGE_MIN_LEN < file_size, f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"

        file_ranges_to_test: list[tuple(int, int, str)] = [
            # Offset is bigger than the file size, the length is small.
            (file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
            # Offset is ok, but offset+length is too big.
            (file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
            # Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
            (RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
            # Length is zero
            (10, 0, INVALID_RANGE_ZERO_LENGTH),
            # Negative values
            (-1, 1, INVALID_OFFSET_SPECIFIER),
            (10, -5, INVALID_LENGTH_SPECIFIER),
        ]

        for range_start, range_len, expected_error in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            expected_error = expected_error.format(range=range_cut) if "{range}" in expected_error else expected_error
            with reporter.step(f"Get range hash ({range_cut})"):
                for oid in oids:
                    with pytest.raises(Exception, match=expected_error):
                        get_range_hash(
                            wallet,
                            cid,
                            oid,
                            shell=self.shell,
                            endpoint=self.cluster.default_rpc_endpoint,
                            range_cut=range_cut,
                        )

    @allure.title("Get range from container and non-container nodes (object_size={object_size})")
    def test_get_range_from_different_node(
        self,
        default_wallet: str,
        common_container: str,
        container_nodes: list[ClusterNode],
        non_container_nodes: list[ClusterNode],
        file_path: str,
    ):

        with reporter.step("Put object to container"):
            container_node = random.choice(container_nodes)
            oid = put_object(default_wallet, file_path, common_container, self.shell, container_node.storage_node.get_rpc_endpoint())

        with reporter.step("Get range from container node endpoint"):
            get_range(
                default_wallet,
                common_container,
                oid,
                "0:10",
                self.shell,
                container_node.storage_node.get_rpc_endpoint(),
            )

        with reporter.step("Get range from non-container node endpoint"):
            non_container_node = random.choice(non_container_nodes)
            get_range(
                default_wallet,
                common_container,
                oid,
                "0:10",
                self.shell,
                non_container_node.storage_node.get_rpc_endpoint(),
            )

    @allure.title("Get range hash from container and non-container nodes (object_size={object_size})")
    def test_get_range_hash_from_different_node(
        self,
        default_wallet: str,
        common_container: str,
        container_nodes: list[ClusterNode],
        non_container_nodes: list[ClusterNode],
        file_path: str,
    ):

        with reporter.step("Put object to container"):
            container_node = random.choice(container_nodes)
            oid = put_object(default_wallet, file_path, common_container, self.shell, container_node.storage_node.get_rpc_endpoint())

        with reporter.step("Get range hash from container node endpoint"):
            get_range_hash(
                default_wallet,
                common_container,
                oid,
                "0:10",
                self.shell,
                container_node.storage_node.get_rpc_endpoint(),
            )

        with reporter.step("Get range hash from non-container node endpoint"):
            non_container_node = random.choice(non_container_nodes)
            get_range_hash(
                default_wallet,
                common_container,
                oid,
                "0:10",
                self.shell,
                non_container_node.storage_node.get_rpc_endpoint(),
            )

    def check_header_is_presented(self, head_info: dict, object_header: dict) -> None:
        for key_to_check, val_to_check in object_header.items():
            assert key_to_check in head_info["header"]["attributes"], f"Key {key_to_check} is found in {head_object}"
            assert head_info["header"]["attributes"].get(key_to_check) == str(val_to_check), f"Value {val_to_check} is equal"