import logging
import re

import allure
import pytest
from frostfs_testlib.resources.common import STORAGE_GC_TIME
from frostfs_testlib.resources.error_patterns import (
    LIFETIME_REQUIRED,
    LOCK_NON_REGULAR_OBJECT,
    LOCK_OBJECT_EXPIRATION,
    LOCK_OBJECT_REMOVAL,
    OBJECT_ALREADY_REMOVED,
    OBJECT_IS_LOCKED,
    OBJECT_NOT_FOUND,
)
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import (
    StorageContainer,
    StorageContainerInfo,
    create_container,
)
from frostfs_testlib.steps.cli.object import delete_object, head_object, lock_object
from frostfs_testlib.steps.complex_object_actions import get_link_object, get_storage_object_chunks
from frostfs_testlib.steps.epoch import ensure_fresh_epoch, get_epoch, tick_epoch
from frostfs_testlib.steps.node_management import drop_object
from frostfs_testlib.steps.storage_object import delete_objects
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import (
    LockObjectInfo,
    StorageObjectInfo,
)
from frostfs_testlib.storage.dataclasses.wallet import WalletFactory, WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import expect_not_raises, wait_for_success
from frostfs_testlib.utils import datetime_utils

from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes

logger = logging.getLogger("NeoLogger")

FIXTURE_LOCK_LIFETIME = 5
FIXTURE_OBJECT_LIFETIME = 10


@pytest.fixture(
    scope="module",
)
def user_wallet(wallet_factory: WalletFactory):
    with allure.step("Create user wallet with container"):
        wallet_file = wallet_factory.create_wallet()
        return wallet_file


@pytest.fixture(
    scope="module",
)
def user_container(user_wallet: WalletInfo, client_shell: Shell, cluster: Cluster):
    container_id = create_container(
        user_wallet.path, shell=client_shell, endpoint=cluster.default_rpc_endpoint
    )
    return StorageContainer(StorageContainerInfo(container_id, user_wallet), client_shell, cluster)


@pytest.fixture(
    scope="module",
)
def locked_storage_object(
    user_container: StorageContainer,
    client_shell: Shell,
    cluster: Cluster,
    object_size: ObjectSize,
):
    """
    Intention of this fixture is to provide storage object which is NOT expected to be deleted during test act phase
    """
    with allure.step("Creating locked object"):
        current_epoch = ensure_fresh_epoch(client_shell, cluster)
        expiration_epoch = current_epoch + FIXTURE_LOCK_LIFETIME

        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + FIXTURE_OBJECT_LIFETIME
        )
        lock_object_id = lock_object(
            storage_object.wallet_file_path,
            storage_object.cid,
            storage_object.oid,
            client_shell,
            cluster.default_rpc_endpoint,
            lifetime=FIXTURE_LOCK_LIFETIME,
        )
        storage_object.locks = [
            LockObjectInfo(
                storage_object.cid, lock_object_id, FIXTURE_LOCK_LIFETIME, expiration_epoch
            )
        ]

    yield storage_object

    with allure.step("Delete created locked object"):
        current_epoch = get_epoch(client_shell, cluster)
        epoch_diff = expiration_epoch - current_epoch + 1

        if epoch_diff > 0:
            with allure.step(f"Tick {epoch_diff} epochs"):
                for _ in range(epoch_diff):
                    tick_epoch(client_shell, cluster)
        try:
            delete_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                client_shell,
                cluster.default_rpc_endpoint,
            )
        except Exception as ex:
            ex_message = str(ex)
            # It's okay if object already removed
            if not re.search(OBJECT_NOT_FOUND, ex_message) and not re.search(
                OBJECT_ALREADY_REMOVED, ex_message
            ):
                raise ex
            logger.debug(ex_message)


@wait_for_success(datetime_utils.parse_time(STORAGE_GC_TIME))
def check_object_not_found(
    wallet_file_path: str, cid: str, oid: str, shell: Shell, rpc_endpoint: str
):
    with pytest.raises(Exception, match=OBJECT_NOT_FOUND):
        head_object(
            wallet_file_path,
            cid,
            oid,
            shell,
            rpc_endpoint,
        )


def verify_object_available(
    wallet_file_path: str, cid: str, oid: str, shell: Shell, rpc_endpoint: str
):
    with expect_not_raises():
        head_object(
            wallet_file_path,
            cid,
            oid,
            shell,
            rpc_endpoint,
        )


@pytest.mark.sanity
@pytest.mark.grpc_object_lock
class TestObjectLockWithGrpc(ClusterTestBase):
    @pytest.fixture()
    def new_locked_storage_object(
        self, user_container: StorageContainer, object_size: ObjectSize
    ) -> StorageObjectInfo:
        """
        Intention of this fixture is to provide new storage object for tests which may delete or corrupt the object or it's complementary objects
        So we need a new one each time we ask for it
        """
        with allure.step("Creating locked object"):
            current_epoch = self.get_epoch()

            storage_object = user_container.generate_object(
                object_size.value, expire_at=current_epoch + FIXTURE_OBJECT_LIFETIME
            )
            lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                lifetime=FIXTURE_LOCK_LIFETIME,
            )

        return storage_object

    @allure.title("Locked object is protected from deletion (obj_size={object_size})")
    def test_locked_object_cannot_be_deleted(
        self,
        locked_storage_object: StorageObjectInfo,
    ):
        """
        Locked object should be protected from deletion
        """
        with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
            delete_object(
                locked_storage_object.wallet_file_path,
                locked_storage_object.cid,
                locked_storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title("Lock object itself is protected from deletion")
    # We operate with only lock object here so no complex object needed in this test
    @pytest.mark.parametrize("object_size", ["simple"], indirect=True)
    def test_lock_object_itself_cannot_be_deleted(
        self,
        locked_storage_object: StorageObjectInfo,
    ):
        """
        Lock object itself should be protected from deletion
        """

        lock_object = locked_storage_object.locks[0]
        wallet_path = locked_storage_object.wallet_file_path

        with pytest.raises(Exception, match=LOCK_OBJECT_REMOVAL):
            delete_object(
                wallet_path,
                lock_object.cid,
                lock_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title("Lock object itself cannot be locked")
    # We operate with only lock object here so no complex object needed in this test
    @pytest.mark.parametrize("object_size", ["simple"], indirect=True)
    def test_lock_object_cannot_be_locked(
        self,
        locked_storage_object: StorageObjectInfo,
    ):
        """
        Lock object itself cannot be locked
        """

        lock_object_info = locked_storage_object.locks[0]
        wallet_path = locked_storage_object.wallet_file_path

        with pytest.raises(Exception, match=LOCK_NON_REGULAR_OBJECT):
            lock_object(
                wallet_path,
                lock_object_info.cid,
                lock_object_info.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                1,
            )

    @allure.title(
        "Lock must contain valid lifetime or expire_at field: (lifetime={wrong_lifetime}, expire-at={wrong_expire_at})"
    )
    # We operate with only lock object here so no complex object needed in this test
    @pytest.mark.parametrize("object_size", ["simple"], indirect=True)
    @pytest.mark.parametrize(
        "wrong_lifetime,wrong_expire_at,expected_error",
        [
            (None, None, LIFETIME_REQUIRED),
            (0, 0, LIFETIME_REQUIRED),
            (0, None, LIFETIME_REQUIRED),
            (None, 0, LIFETIME_REQUIRED),
            (-1, None, 'invalid argument "-1" for "--lifetime" flag'),
            (None, -1, 'invalid argument "-1" for "-e, --expire-at" flag'),
        ],
    )
    def test_cannot_lock_object_without_lifetime(
        self,
        locked_storage_object: StorageObjectInfo,
        wrong_lifetime: int,
        wrong_expire_at: int,
        expected_error: str,
    ):
        """
        Cannot lock object without lifetime and expire_at fields
        """

        lock_object_info = locked_storage_object.locks[0]
        wallet_path = locked_storage_object.wallet_file_path

        with pytest.raises(Exception, match=expected_error):
            lock_object(
                wallet_path,
                lock_object_info.cid,
                lock_object_info.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                lifetime=wrong_lifetime,
                expire_at=wrong_expire_at,
            )

    @allure.title("Expired object is deleted when locks are expired (obj_size={object_size})")
    def test_expired_object_should_be_deleted_after_locks_are_expired(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        """
        Expired object should be deleted after locks are expired
        """

        current_epoch = self.ensure_fresh_epoch()
        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + 1
        )

        with allure.step("Lock object for couple epochs"):
            lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                lifetime=2,
            )
            lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=current_epoch + 2,
            )

        with allure.step("Check object is not deleted at expiration time"):
            self.tick_epochs(2)
            # Must wait to ensure object is not deleted
            wait_for_gc_pass_on_storage_nodes()
            with expect_not_raises():
                head_object(
                    storage_object.wallet_file_path,
                    storage_object.cid,
                    storage_object.oid,
                    self.shell,
                    self.cluster.default_rpc_endpoint,
                )

        with allure.step("Wait for object to be deleted after third epoch"):
            self.tick_epoch()
            check_object_not_found(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title("Lock multiple objects at once (obj_size={object_size})")
    def test_should_be_possible_to_lock_multiple_objects_at_once(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        """
        Should be possible to lock multiple objects at once
        """

        current_epoch = ensure_fresh_epoch(self.shell, self.cluster)
        storage_objects: list[StorageObjectInfo] = []

        with allure.step("Generate three objects"):
            for _ in range(3):
                storage_objects.append(
                    user_container.generate_object(object_size.value, expire_at=current_epoch + 5)
                )

        lock_object(
            storage_objects[0].wallet_file_path,
            storage_objects[0].cid,
            ",".join([storage_object.oid for storage_object in storage_objects]),
            self.shell,
            self.cluster.default_rpc_endpoint,
            expire_at=current_epoch + 1,
        )

        for storage_object in storage_objects:
            with allure.step(f"Try to delete object {storage_object.oid}"):
                with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
                    delete_object(
                        storage_object.wallet_file_path,
                        storage_object.cid,
                        storage_object.oid,
                        self.shell,
                        self.cluster.default_rpc_endpoint,
                    )

        with allure.step("Tick two epochs"):
            self.tick_epoch()
            self.tick_epoch()

        with expect_not_raises():
            delete_objects(storage_objects, self.shell, self.cluster)

    @allure.title("Outdated lock cannot be applied (obj_size={object_size})")
    def test_already_outdated_lock_should_not_be_applied(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        """
        Already outdated lock should not be applied
        """

        current_epoch = self.ensure_fresh_epoch()

        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + 1
        )

        expiration_epoch = current_epoch - 1
        with pytest.raises(
            Exception,
            match=LOCK_OBJECT_EXPIRATION.format(
                expiration_epoch=expiration_epoch, current_epoch=current_epoch
            ),
        ):
            lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=expiration_epoch,
            )

    @allure.title("Delete object when lock is expired by lifetime (obj_size={object_size})")
    @expect_not_raises()
    def test_after_lock_expiration_with_lifetime_user_should_be_able_to_delete_object(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        """
        After lock expiration with lifetime user should be able to delete object
        """

        current_epoch = self.ensure_fresh_epoch()
        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + 5
        )

        lock_object(
            storage_object.wallet_file_path,
            storage_object.cid,
            storage_object.oid,
            self.shell,
            self.cluster.default_rpc_endpoint,
            lifetime=1,
        )

        self.tick_epochs(2)
        with expect_not_raises():
            delete_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title("Delete object when lock is expired by expire_at (obj_size={object_size})")
    @expect_not_raises()
    def test_after_lock_expiration_with_expire_at_user_should_be_able_to_delete_object(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        """
        After lock expiration with expire_at user should be able to delete object
        """

        current_epoch = self.ensure_fresh_epoch()

        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + 5
        )

        lock_object(
            storage_object.wallet_file_path,
            storage_object.cid,
            storage_object.oid,
            self.shell,
            endpoint=self.cluster.default_rpc_endpoint,
            expire_at=current_epoch + 1,
        )

        self.tick_epochs(2)

        with expect_not_raises():
            delete_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title("Complex object chunks are protected from deletion")
    @pytest.mark.parametrize(
        # Only complex objects are required for this test
        "object_size",
        ["complex"],
        indirect=True,
    )
    def test_complex_object_chunks_should_also_be_protected_from_deletion(
        self,
        locked_storage_object: StorageObjectInfo,
    ):
        """
        Complex object chunks should also be protected from deletion
        """

        chunk_object_ids = get_storage_object_chunks(
            locked_storage_object, self.shell, self.cluster
        )
        for chunk_object_id in chunk_object_ids:
            with allure.step(f"Try to delete chunk object {chunk_object_id}"):
                with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
                    delete_object(
                        locked_storage_object.wallet_file_path,
                        locked_storage_object.cid,
                        chunk_object_id,
                        self.shell,
                        self.cluster.default_rpc_endpoint,
                    )

    @allure.title("Drop link object of locked complex object")
    @pytest.mark.grpc_control
    @pytest.mark.parametrize(
        "object_size",
        # Only complex object is required
        ["complex"],
        indirect=True,
    )
    def test_link_object_of_locked_complex_object_can_be_dropped(
        self, new_locked_storage_object: StorageObjectInfo
    ):
        link_object_id = get_link_object(
            new_locked_storage_object.wallet_file_path,
            new_locked_storage_object.cid,
            new_locked_storage_object.oid,
            self.shell,
            self.cluster.storage_nodes,
        )

        with allure.step(f"Drop link object with id {link_object_id} from nodes"):
            nodes_with_object = get_nodes_with_object(
                new_locked_storage_object.cid,
                link_object_id,
                shell=self.shell,
                nodes=self.cluster.storage_nodes,
            )
            for node in nodes_with_object:
                with expect_not_raises():
                    drop_object(node, new_locked_storage_object.cid, link_object_id)

    @allure.title("Drop chunks of locked complex object")
    @pytest.mark.grpc_control
    @pytest.mark.parametrize(
        "object_size",
        # Only complex object is required
        ["complex"],
        indirect=True,
    )
    def test_chunks_of_locked_complex_object_can_be_dropped(
        self, new_locked_storage_object: StorageObjectInfo
    ):
        chunk_objects = get_storage_object_chunks(
            new_locked_storage_object, self.shell, self.cluster
        )

        for chunk_object_id in chunk_objects:
            with allure.step(f"Drop chunk object with id {chunk_object_id} from nodes"):
                nodes_with_object = get_nodes_with_object(
                    new_locked_storage_object.cid,
                    chunk_object_id,
                    shell=self.shell,
                    nodes=self.cluster.storage_nodes,
                )
                for node in nodes_with_object:
                    with expect_not_raises():
                        drop_object(node, new_locked_storage_object.cid, chunk_object_id)

    @allure.title("Drop locked object (obj_size={object_size})")
    @pytest.mark.grpc_control
    def test_locked_object_can_be_dropped(self, new_locked_storage_object: StorageObjectInfo):
        nodes_with_object = get_nodes_with_object(
            new_locked_storage_object.cid,
            new_locked_storage_object.oid,
            shell=self.shell,
            nodes=self.cluster.storage_nodes,
        )

        for node in nodes_with_object:
            with expect_not_raises():
                drop_object(node, new_locked_storage_object.cid, new_locked_storage_object.oid)

    @allure.title("Link object of complex object is protected from deletion")
    @pytest.mark.parametrize(
        # Only complex objects are required for this test
        "object_size",
        ["complex"],
        indirect=True,
    )
    def test_link_object_of_complex_object_should_also_be_protected_from_deletion(
        self,
        locked_storage_object: StorageObjectInfo,
    ):
        """
        Link object of complex object should also be protected from deletion
        """

        link_object_id = get_link_object(
            locked_storage_object.wallet_file_path,
            locked_storage_object.cid,
            locked_storage_object.oid,
            self.shell,
            self.cluster.storage_nodes,
            is_direct=False,
        )
        with allure.step(f"Try to delete link object {link_object_id}"):
            with pytest.raises(Exception, match=OBJECT_IS_LOCKED):
                delete_object(
                    locked_storage_object.wallet_file_path,
                    locked_storage_object.cid,
                    link_object_id,
                    self.shell,
                    self.cluster.default_rpc_endpoint,
                )

    @allure.title("Expired object is removed after all locks are expired (obj_size={object_size})")
    def test_expired_object_should_be_removed_after_relocks_expare_at(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):
        current_epoch = self.ensure_fresh_epoch()
        storage_object = user_container.generate_object(
            object_size.value, expire_at=current_epoch + 1
        )

        with allure.step("Apply first lock to object for 3 epochs"):
            lock_object_id_0 = lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=current_epoch + 3,
            )

        self.tick_epochs(2)

        with allure.step("Check first lock is still available"):
            verify_object_available(
                storage_object.wallet_file_path,
                storage_object.cid,
                lock_object_id_0,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

        with allure.step("Apply second lock to object for 3 more epochs"):
            lock_object_id_1 = lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=current_epoch + 5,
            )

        self.tick_epochs(2)

        with allure.step("Verify first lock is expired and removed"):
            check_object_not_found(
                storage_object.wallet_file_path,
                storage_object.cid,
                lock_object_id_0,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

        with allure.step("Verify second lock is still available"):
            verify_object_available(
                storage_object.wallet_file_path,
                storage_object.cid,
                lock_object_id_1,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

        with allure.step("Apply third lock to object for 3 more epochs"):
            lock_object(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=current_epoch + 7,
            )

        with allure.step("Verify object is deleted after all locks are expired"):
            self.tick_epochs(4)
            check_object_not_found(
                storage_object.wallet_file_path,
                storage_object.cid,
                storage_object.oid,
                self.shell,
                self.cluster.default_rpc_endpoint,
            )

    @allure.title(
        "Two expired objects with one lock are deleted after lock expiration (obj_size={object_size})"
    )
    def test_two_objects_expiration_with_one_lock(
        self,
        user_container: StorageContainer,
        object_size: ObjectSize,
    ):

        current_epoch = self.ensure_fresh_epoch()
        storage_objects: list[StorageObjectInfo] = []

        with allure.step("Generate two objects"):
            for epoch_i in range(2):
                storage_objects.append(
                    user_container.generate_object(
                        object_size.value, expire_at=current_epoch + epoch_i + 3
                    )
                )

        self.tick_epoch()

        with allure.step("Lock objects for 4 epochs"):
            lock_object(
                storage_objects[0].wallet_file_path,
                storage_objects[0].cid,
                ",".join([storage_object.oid for storage_object in storage_objects]),
                self.shell,
                self.cluster.default_rpc_endpoint,
                expire_at=current_epoch + 4,
            )

        with allure.step("Verify objects are available during next three epochs"):
            for epoch_i in range(3):
                self.tick_epoch()
                with allure.step(f"Check objects at epoch {current_epoch + epoch_i + 2}"):
                    for storage_object in storage_objects:
                        verify_object_available(
                            storage_object.wallet_file_path,
                            storage_object.cid,
                            storage_object.oid,
                            self.shell,
                            self.cluster.default_rpc_endpoint,
                        )

        with allure.step("Verify objects are deleted after lock was expired"):
            self.tick_epoch()
            for storage_object in storage_objects:
                check_object_not_found(
                    storage_object.wallet_file_path,
                    storage_object.cid,
                    storage_object.oid,
                    self.shell,
                    self.cluster.default_rpc_endpoint,
                )