import logging
import random
import sys

import allure
import pytest
from frostfs_testlib.resources.error_patterns import (
    INVALID_LENGTH_SPECIFIER,
    INVALID_OFFSET_SPECIFIER,
    INVALID_RANGE_OVERFLOW,
    INVALID_RANGE_ZERO_LENGTH,
    OUT_OF_RANGE,
)
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import (
    get_object_from_random_node,
    get_range,
    get_range_hash,
    head_object,
    put_object_to_random_node,
    search_object,
)
from frostfs_testlib.steps.complex_object_actions import get_complex_object_split_ranges
from frostfs_testlib.steps.storage_object import delete_objects
from frostfs_testlib.steps.storage_policy import get_complex_object_copies, get_simple_object_copies
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.file_utils import generate_file, get_file_content, get_file_hash
from pytest import FixtureRequest

logger = logging.getLogger("NeoLogger")

CLEANUP_TIMEOUT = 10
COMMON_ATTRIBUTE = {"common_key": "common_value"}
# Will upload object for each attribute set
OBJECT_ATTRIBUTES = [
    None,
    {"key1": 1, "key2": "abc", "common_key": "common_value"},
    {"key1": 2, "common_key": "common_value"},
]

# Config for Range tests
RANGES_COUNT = 4  # by quarters
RANGE_MIN_LEN = 10
RANGE_MAX_LEN = 500
# Used for static ranges found with issues
STATIC_RANGES = {}


def generate_ranges(
    storage_object: StorageObjectInfo, max_object_size: int, shell: Shell, cluster: Cluster
) -> list[(int, int)]:
    file_range_step = storage_object.size / RANGES_COUNT

    file_ranges = []
    file_ranges_to_test = []

    for i in range(0, RANGES_COUNT):
        file_ranges.append((int(file_range_step * i), int(file_range_step)))

    # For simple object we can read all file ranges without too much time for testing
    if storage_object.size < max_object_size:
        file_ranges_to_test.extend(file_ranges)
    # For complex object we need to fetch multiple child objects from different nodes.
    else:
        assert (
            storage_object.size >= RANGE_MAX_LEN + max_object_size
        ), f"Complex object size should be at least {max_object_size + RANGE_MAX_LEN}. Current: {storage_object.size}"
        file_ranges_to_test.append((RANGE_MAX_LEN, max_object_size - RANGE_MAX_LEN))
        file_ranges_to_test.extend(get_complex_object_split_ranges(storage_object, shell, cluster))

    # Special cases to read some bytes from start and some bytes from end of object
    file_ranges_to_test.append((0, RANGE_MIN_LEN))
    file_ranges_to_test.append((storage_object.size - RANGE_MIN_LEN, RANGE_MIN_LEN))

    for offset, length in file_ranges:
        range_length = random.randint(RANGE_MIN_LEN, RANGE_MAX_LEN)
        range_start = random.randint(offset, offset + length)

        file_ranges_to_test.append(
            (range_start, min(range_length, storage_object.size - range_start))
        )

    file_ranges_to_test.extend(STATIC_RANGES.get(storage_object.size, []))

    return file_ranges_to_test


@pytest.fixture(
    params=[pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
    ids=["simple object size", "complex object size"],
    # Scope session to upload/delete each files set only once
    scope="module",
)
def storage_objects(
    default_wallet: str, client_shell: Shell, cluster: Cluster, request: FixtureRequest
) -> list[StorageObjectInfo]:
    wallet = default_wallet
    # Separate containers for complex/simple objects to avoid side-effects
    cid = create_container(wallet, shell=client_shell, endpoint=cluster.default_rpc_endpoint)
    object_size: ObjectSize = request.param

    file_path = generate_file(object_size.value)
    file_hash = get_file_hash(file_path)

    storage_objects = []

    with allure.step("Put objects"):
        # We need to upload objects multiple times with different attributes
        for attributes in OBJECT_ATTRIBUTES:
            storage_object_id = put_object_to_random_node(
                wallet=wallet,
                path=file_path,
                cid=cid,
                shell=client_shell,
                cluster=cluster,
                attributes=attributes,
            )

            storage_object = StorageObjectInfo(cid, storage_object_id)
            storage_object.size = object_size.value
            storage_object.wallet_file_path = wallet
            storage_object.file_path = file_path
            storage_object.file_hash = file_hash
            storage_object.attributes = attributes

            storage_objects.append(storage_object)

    yield storage_objects

    # Teardown after all tests done with current param
    delete_objects(storage_objects, client_shell, cluster)


@pytest.mark.sanity
@pytest.mark.grpc_api
class TestObjectApi(ClusterTestBase):
    @allure.title("Validate object storage policy by native API for {storage_objects}")
    def test_object_storage_policies(
        self,
        request: FixtureRequest,
        storage_objects: list[StorageObjectInfo],
        simple_object_size: ObjectSize,
    ):
        """
        Validate object storage policy
        """
        allure.dynamic.title(
            f"Validate object storage policy by native API for {request.node.callspec.id}"
        )

        with allure.step("Validate storage policy for objects"):
            for storage_object in storage_objects:
                if storage_object.size == simple_object_size.value:
                    copies = get_simple_object_copies(
                        storage_object.wallet_file_path,
                        storage_object.cid,
                        storage_object.oid,
                        shell=self.shell,
                        nodes=self.cluster.storage_nodes,
                    )
                else:
                    copies = get_complex_object_copies(
                        storage_object.wallet_file_path,
                        storage_object.cid,
                        storage_object.oid,
                        shell=self.shell,
                        nodes=self.cluster.storage_nodes,
                    )
                assert copies == 2, "Expected 2 copies"

    @allure.title("Validate get object native API for {storage_objects}")
    def test_get_object_api(
        self, request: FixtureRequest, storage_objects: list[StorageObjectInfo]
    ):
        """
        Validate get object native API
        """
        allure.dynamic.title(f"Validate get object native API for {request.node.callspec.id}")

        with allure.step("Get objects and compare hashes"):
            for storage_object in storage_objects:
                file_path = get_object_from_random_node(
                    storage_object.wallet_file_path,
                    storage_object.cid,
                    storage_object.oid,
                    self.shell,
                    cluster=self.cluster,
                )
                file_hash = get_file_hash(file_path)
                assert storage_object.file_hash == file_hash

    @allure.title("Validate head object native API for {storage_objects}")
    def test_head_object_api(
        self, request: FixtureRequest, storage_objects: list[StorageObjectInfo]
    ):
        """
        Validate head object native API
        """
        allure.dynamic.title(f"Validate head object by native API for {request.node.callspec.id}")

        storage_object_1 = storage_objects[0]
        storage_object_2 = storage_objects[1]

        with allure.step("Head object and validate"):
            head_object(
                storage_object_1.wallet_file_path,
                storage_object_1.cid,
                storage_object_1.oid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
            )
            head_info = head_object(
                storage_object_2.wallet_file_path,
                storage_object_2.cid,
                storage_object_2.oid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
            )
            self.check_header_is_presented(head_info, storage_object_2.attributes)

    @allure.title("Validate object search by native API for {storage_objects}")
    def test_search_object_api(
        self, request: FixtureRequest, storage_objects: list[StorageObjectInfo]
    ):
        """
        Validate object search by native API
        """
        allure.dynamic.title(f"Validate object search by native API for {request.node.callspec.id}")

        oids = [storage_object.oid for storage_object in storage_objects]
        wallet = storage_objects[0].wallet_file_path
        cid = storage_objects[0].cid

        test_table = [
            (OBJECT_ATTRIBUTES[1], oids[1:2]),
            (OBJECT_ATTRIBUTES[2], oids[2:3]),
            (COMMON_ATTRIBUTE, oids[1:3]),
        ]

        with allure.step("Search objects"):
            # Search with no attributes
            result = search_object(
                wallet,
                cid,
                shell=self.shell,
                endpoint=self.cluster.default_rpc_endpoint,
                expected_objects_list=oids,
                root=True,
            )
            assert sorted(oids) == sorted(result)

            # search by test table
            for filter, expected_oids in test_table:
                result = search_object(
                    wallet,
                    cid,
                    shell=self.shell,
                    endpoint=self.cluster.default_rpc_endpoint,
                    filters=filter,
                    expected_objects_list=expected_oids,
                    root=True,
                )
                assert sorted(expected_oids) == sorted(result)

    @allure.title("Validate object search with removed items for {object_size}")
    @pytest.mark.parametrize(
        "object_size",
        [pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
        ids=["simple object size", "complex object size"],
    )
    def test_object_search_should_return_tombstone_items(
        self, default_wallet: str, object_size: ObjectSize
    ):
        """
        Validate object search with removed items
        """

        wallet = default_wallet
        cid = create_container(wallet, self.shell, self.cluster.default_rpc_endpoint)

        with allure.step("Upload file"):
            file_path = generate_file(object_size.value)
            file_hash = get_file_hash(file_path)

            storage_object = StorageObjectInfo(
                cid=cid,
                oid=put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster),
                size=object_size.value,
                wallet_file_path=wallet,
                file_path=file_path,
                file_hash=file_hash,
            )

        with allure.step("Search object"):
            # Root Search object should return root object oid
            result = search_object(
                wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, root=True
            )
            assert result == [storage_object.oid]

        with allure.step("Delete file"):
            delete_objects([storage_object], self.shell, self.cluster)

        with allure.step("Search deleted object with --root"):
            # Root Search object should return nothing
            result = search_object(
                wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, root=True
            )
            assert len(result) == 0

        with allure.step("Search deleted object with --phy should return only tombstones"):
            # Physical Search object should return only tombstones
            result = search_object(
                wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, phy=True
            )
            assert (
                storage_object.tombstone in result
            ), "Search result should contain tombstone of removed object"
            assert (
                storage_object.oid not in result
            ), "Search result should not contain ObjectId of removed object"
            for tombstone_oid in result:
                header = head_object(
                    wallet,
                    cid,
                    tombstone_oid,
                    shell=self.shell,
                    endpoint=self.cluster.default_rpc_endpoint,
                )["header"]
                object_type = header["objectType"]
                assert (
                    object_type == "TOMBSTONE"
                ), f"Object wasn't deleted properly. Found object {tombstone_oid} with type {object_type}"

    @allure.title("Validate native get_range_hash object API for {storage_objects}")
    @pytest.mark.sanity
    @pytest.mark.grpc_api
    def test_object_get_range_hash(
        self, request: FixtureRequest, storage_objects: list[StorageObjectInfo], max_object_size
    ):
        """
        Validate get_range_hash for object by native gRPC API
        """
        allure.dynamic.title(
            f"Validate native get_range_hash object API for {request.node.callspec.id}"
        )

        wallet = storage_objects[0].wallet_file_path
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_path = storage_objects[0].file_path

        file_ranges_to_test = generate_ranges(
            storage_objects[0], max_object_size, self.shell, self.cluster
        )
        logging.info(f"Ranges used in test {file_ranges_to_test}")

        for range_start, range_len in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            with allure.step(f"Get range hash ({range_cut})"):
                for oid in oids:
                    range_hash = get_range_hash(
                        wallet,
                        cid,
                        oid,
                        shell=self.shell,
                        endpoint=self.cluster.default_rpc_endpoint,
                        range_cut=range_cut,
                    )
                    assert (
                        get_file_hash(file_path, range_len, range_start) == range_hash
                    ), f"Expected range hash to match {range_cut} slice of file payload"

    @allure.title("Validate native get_range object API for {storage_objects}")
    @pytest.mark.sanity
    @pytest.mark.grpc_api
    def test_object_get_range(
        self, request: FixtureRequest, storage_objects: list[StorageObjectInfo], max_object_size
    ):
        """
        Validate get_range for object by native gRPC API
        """
        allure.dynamic.title(f"Validate native get_range object API for {request.node.callspec.id}")

        wallet = storage_objects[0].wallet_file_path
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_path = storage_objects[0].file_path

        file_ranges_to_test = generate_ranges(
            storage_objects[0], max_object_size, self.shell, self.cluster
        )
        logging.info(f"Ranges used in test {file_ranges_to_test}")

        for range_start, range_len in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            with allure.step(f"Get range ({range_cut})"):
                for oid in oids:
                    _, range_content = get_range(
                        wallet,
                        cid,
                        oid,
                        shell=self.shell,
                        endpoint=self.cluster.default_rpc_endpoint,
                        range_cut=range_cut,
                    )
                    assert (
                        get_file_content(
                            file_path, content_len=range_len, mode="rb", offset=range_start
                        )
                        == range_content
                    ), f"Expected range content to match {range_cut} slice of file payload"

    @allure.title(
        "[NEGATIVE] Invalid range in get_range native object API should return error for {storage_objects}"
    )
    @pytest.mark.sanity
    @pytest.mark.grpc_api
    def test_object_get_range_negatives(
        self,
        request: FixtureRequest,
        storage_objects: list[StorageObjectInfo],
    ):
        """
        Validate get_range negative for object by native gRPC API
        """
        allure.dynamic.title(
            f"[NEGATIVE] Invalid range in get_range native object API should return error for {request.node.callspec.id}"
        )

        wallet = storage_objects[0].wallet_file_path
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_size = storage_objects[0].size

        assert (
            RANGE_MIN_LEN < file_size
        ), f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"

        file_ranges_to_test: list[tuple(int, int, str)] = [
            # Offset is bigger than the file size, the length is small.
            (file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
            # Offset is ok, but offset+length is too big.
            (file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
            # Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
            (RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
            # Length is zero
            (10, 0, INVALID_RANGE_ZERO_LENGTH),
            # Negative values
            (-1, 1, INVALID_OFFSET_SPECIFIER),
            (10, -5, INVALID_LENGTH_SPECIFIER),
        ]

        for range_start, range_len, expected_error in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            expected_error = (
                expected_error.format(range=range_cut)
                if "{range}" in expected_error
                else expected_error
            )
            with allure.step(f"Get range ({range_cut})"):
                for oid in oids:
                    with pytest.raises(Exception, match=expected_error):
                        get_range(
                            wallet,
                            cid,
                            oid,
                            shell=self.shell,
                            endpoint=self.cluster.default_rpc_endpoint,
                            range_cut=range_cut,
                        )

    @allure.title(
        "[NEGATIVE] Invalid range in get_range_hash native object API should return error for {storage_objects}"
    )
    def test_object_get_range_hash_negatives(
        self,
        request: FixtureRequest,
        storage_objects: list[StorageObjectInfo],
    ):
        """
        Validate get_range_hash negative for object by native gRPC API
        """
        allure.dynamic.title(
            f"[NEGATIVE] Invalid range in get_range_hash native object API should return error for {request.node.callspec.id}"
        )

        wallet = storage_objects[0].wallet_file_path
        cid = storage_objects[0].cid
        oids = [storage_object.oid for storage_object in storage_objects[:2]]
        file_size = storage_objects[0].size

        assert (
            RANGE_MIN_LEN < file_size
        ), f"Incorrect test setup. File size ({file_size}) is less than RANGE_MIN_LEN ({RANGE_MIN_LEN})"

        file_ranges_to_test: list[tuple(int, int, str)] = [
            # Offset is bigger than the file size, the length is small.
            (file_size + 1, RANGE_MIN_LEN, OUT_OF_RANGE),
            # Offset is ok, but offset+length is too big.
            (file_size - RANGE_MIN_LEN, RANGE_MIN_LEN * 2, OUT_OF_RANGE),
            # Offset is ok, and length is very-very big (e.g. MaxUint64) so that offset+length is wrapped and still "valid".
            (RANGE_MIN_LEN, sys.maxsize * 2 + 1, INVALID_RANGE_OVERFLOW),
            # Length is zero
            (10, 0, INVALID_RANGE_ZERO_LENGTH),
            # Negative values
            (-1, 1, INVALID_OFFSET_SPECIFIER),
            (10, -5, INVALID_LENGTH_SPECIFIER),
        ]

        for range_start, range_len, expected_error in file_ranges_to_test:
            range_cut = f"{range_start}:{range_len}"
            expected_error = (
                expected_error.format(range=range_cut)
                if "{range}" in expected_error
                else expected_error
            )
            with allure.step(f"Get range hash ({range_cut})"):
                for oid in oids:
                    with pytest.raises(Exception, match=expected_error):
                        get_range_hash(
                            wallet,
                            cid,
                            oid,
                            shell=self.shell,
                            endpoint=self.cluster.default_rpc_endpoint,
                            range_cut=range_cut,
                        )

    def check_header_is_presented(self, head_info: dict, object_header: dict) -> None:
        for key_to_check, val_to_check in object_header.items():
            assert (
                key_to_check in head_info["header"]["attributes"]
            ), f"Key {key_to_check} is found in {head_object}"
            assert head_info["header"]["attributes"].get(key_to_check) == str(
                val_to_check
            ), f"Value {val_to_check} is equal"